some cleanup, seperating classes into their own files.

This commit is contained in:
2026-03-11 12:14:30 +01:00
parent 7301186102
commit 00c111da73
13 changed files with 1057 additions and 1033 deletions

View File

@@ -31,31 +31,20 @@ class AboutDialog(wx.Dialog):
info_text = wx.StaticText(self, label="wxIRC Client") info_text = wx.StaticText(self, label="wxIRC Client")
info_font = wx.Font(14, wx.FONTFAMILY_DEFAULT, wx.FONTSTYLE_NORMAL, wx.FONTWEIGHT_BOLD) info_font = wx.Font(14, wx.FONTFAMILY_DEFAULT, wx.FONTSTYLE_NORMAL, wx.FONTWEIGHT_BOLD)
info_text.SetFont(info_font) info_text.SetFont(info_font)
sizer.Add(info_text, 0, wx.ALL | wx.ALIGN_CENTER, 5)
import base64, hashlib version_text = wx.StaticText(self, label="Version : 0.2.3") # COMMIT_HUNDRED:COMMIT_TEN:COMMIT_ONE e.g 23 commits -> 0.2.3
self.encoded = base64.b64encode(
hashlib.sha256(f"{self.GetId()}-{self.GetHandle()}".encode()).digest()
).decode('utf-8')
version_text = wx.StaticText(self, label=f"wxIRC V:e1")
version_font = wx.Font(10, wx.FONTFAMILY_DEFAULT, wx.FONTSTYLE_NORMAL, wx.FONTWEIGHT_NORMAL) version_font = wx.Font(10, wx.FONTFAMILY_DEFAULT, wx.FONTSTYLE_NORMAL, wx.FONTWEIGHT_NORMAL)
version_text.SetFont(version_font) version_text.SetFont(version_font)
rand_hash = wx.StaticText(self, label=f"{self.encoded}")
rand_hash.SetFont(version_font)
contrubutors_text = wx.StaticText(self, label="This software may not be used for commercial purposes. \n And may not be distributed for commercial purposes.")
contrubutors_font = wx.Font(10, wx.FONTFAMILY_DEFAULT, wx.FONTSTYLE_NORMAL, wx.FONTWEIGHT_NORMAL)
contrubutors_text.SetFont(contrubutors_font)
# Add info to sizer
sizer.Add(info_text, 0, wx.ALL | wx.ALIGN_CENTER, 5)
sizer.Add(version_text, 0, wx.ALL | wx.ALIGN_CENTER, 5) sizer.Add(version_text, 0, wx.ALL | wx.ALIGN_CENTER, 5)
sizer.Add(rand_hash, 0, wx.ALL | wx.ALIGN_CENTER, 5)
sizer.Add(contrubutors_text, 0, wx.ALL | wx.ALIGN_CENTER, 5) contributors_text = wx.StaticText(self, label="""
Contributors:
* rattatwinko (rattatwinko.servecounterstrike.com)
""")
contributors_font = wx.Font(10, wx.FONTFAMILY_DEFAULT, wx.FONTSTYLE_NORMAL, wx.FONTWEIGHT_NORMAL)
contributors_text.SetFont(contributors_font)
sizer.Add(contributors_text, 0, wx.ALL | wx.ALIGN_CENTER, 5)
# OK button # OK button
ok_btn = wx.Button(self, wx.ID_OK, "OK") ok_btn = wx.Button(self, wx.ID_OK, "OK")

161
src/CommandAutocomplete.py Normal file
View File

@@ -0,0 +1,161 @@
import wx
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class CommandAutocomplete(wx.PopupTransientWindow):
"""Popup window for IRC command autocomplete, similar to Minecraft."""
def __init__(self, parent, commands, on_select_callback):
super().__init__(parent, wx.BORDER_SIMPLE)
self.on_select_callback = on_select_callback
self.commands = commands # List of (command, description) tuples
self.filtered_commands = commands.copy()
self.selected_index = 0
self._init_ui()
self.Bind(wx.EVT_WINDOW_DESTROY, self.on_destroy)
def _init_ui(self):
panel = wx.Panel(self)
main_sizer = wx.BoxSizer(wx.VERTICAL)
# Command list
self.list_ctrl = wx.ListCtrl(panel, style=wx.LC_REPORT | wx.LC_SINGLE_SEL | wx.LC_NO_HEADER)
self.list_ctrl.InsertColumn(0, "Command", width=120)
self.list_ctrl.InsertColumn(1, "Description", width=280)
self._update_list()
# Bind events
self.list_ctrl.Bind(wx.EVT_LIST_ITEM_ACTIVATED, self.on_item_activated)
self.list_ctrl.Bind(wx.EVT_LIST_ITEM_SELECTED, self.on_item_selected)
self.Bind(wx.EVT_CHAR_HOOK, self.on_char_hook)
self.Bind(wx.EVT_KILL_FOCUS, self.on_kill_focus)
main_sizer.Add(self.list_ctrl, 1, wx.EXPAND | wx.ALL, 2)
panel.SetSizer(main_sizer)
main_sizer.Fit(panel)
self.SetClientSize(panel.GetBestSize())
# Select first item
if self.list_ctrl.GetItemCount() > 0:
self.list_ctrl.Select(0)
self.selected_index = 0
def _update_list(self):
"""Update the list with filtered commands."""
self.list_ctrl.DeleteAllItems()
for cmd, desc in self.filtered_commands:
idx = self.list_ctrl.InsertItem(self.list_ctrl.GetItemCount(), cmd)
self.list_ctrl.SetItem(idx, 1, desc)
# Resize to fit content (max 8 items visible)
item_height = 20
max_items = min(8, len(self.filtered_commands))
self.list_ctrl.SetSize((410, item_height * max_items + 4))
self.SetClientSize((410, item_height * max_items + 4))
def filter_commands(self, search_text):
"""Filter commands based on search text."""
search_lower = search_text.lower().strip()
if not search_lower:
self.filtered_commands = self.commands.copy()
else:
self.filtered_commands = [
(cmd, desc) for cmd, desc in self.commands
if cmd.lower().startswith(search_lower)
]
self.selected_index = 0
self._update_list()
if self.list_ctrl.GetItemCount() > 0:
self.list_ctrl.Select(0)
def on_item_activated(self, event):
"""Handle double-click or Enter on item."""
idx = event.GetIndex()
if 0 <= idx < len(self.filtered_commands):
cmd, _ = self.filtered_commands[idx]
if self.on_select_callback:
self.on_select_callback(cmd)
self.safe_dismiss()
def on_item_selected(self, event):
"""Handle item selection."""
self.selected_index = event.GetIndex()
def select_next(self):
"""Select next item."""
if self.list_ctrl.GetItemCount() > 0:
self.selected_index = (self.selected_index + 1) % self.list_ctrl.GetItemCount()
self.list_ctrl.Select(self.selected_index)
self.list_ctrl.EnsureVisible(self.selected_index)
def select_previous(self):
"""Select previous item."""
if self.list_ctrl.GetItemCount() > 0:
self.selected_index = (self.selected_index - 1) % self.list_ctrl.GetItemCount()
self.list_ctrl.Select(self.selected_index)
self.list_ctrl.EnsureVisible(self.selected_index)
def get_selected_command(self):
"""Get the currently selected command."""
if 0 <= self.selected_index < len(self.filtered_commands):
return self.filtered_commands[self.selected_index][0]
return None
def on_char_hook(self, event):
"""Handle keyboard events."""
keycode = event.GetKeyCode()
if keycode == wx.WXK_ESCAPE:
self.safe_dismiss()
elif keycode == wx.WXK_UP:
self.select_previous()
elif keycode == wx.WXK_DOWN:
self.select_next()
elif keycode == wx.WXK_RETURN or keycode == wx.WXK_NUMPAD_ENTER:
cmd = self.get_selected_command()
if cmd and self.on_select_callback:
self.on_select_callback(cmd)
self.safe_dismiss()
else:
event.Skip()
def on_kill_focus(self, event):
"""Handle focus loss."""
focused = wx.Window.FindFocus()
if focused and (focused == self.list_ctrl or self.IsDescendant(focused)):
event.Skip()
return
wx.CallLater(100, self.safe_dismiss)
event.Skip()
def on_destroy(self, event):
event.Skip()
def safe_dismiss(self):
"""Safely dismiss the popup."""
if not self.IsBeingDeleted() and self.IsShown():
try:
self.Dismiss()
except Exception as e:
logger.error(f"Error dismissing command autocomplete: {e}")
def show_at_input(self, input_ctrl):
"""Show popup near the input control."""
input_screen_pos = input_ctrl.ClientToScreen((0, 0))
popup_size = self.GetSize()
display_size = wx.GetDisplaySize()
# Show above the input, otherwise below
if input_screen_pos.y - popup_size.height > 0:
popup_y = input_screen_pos.y - popup_size.height - 2
else:
popup_y = input_screen_pos.y + input_ctrl.GetSize().height + 2
# Keep popup on screen horizontally
popup_x = max(10, min(input_screen_pos.x, display_size.x - popup_size.width - 10))
self.Position((popup_x, popup_y), (0, 0))
self.Popup()

View File

@@ -5,358 +5,13 @@ import logging
from SearchDialog import SearchDialog from SearchDialog import SearchDialog
import traceback import traceback
import KaomojiPicker as KaomojiPicker
import CommandAutocomplete as CommandAutocomplete
# Set up logging # Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class KaomojiPicker(wx.PopupTransientWindow):
def __init__(self, parent, kaomoji_groups, on_select_callback):
super().__init__(parent, wx.BORDER_SIMPLE)
self.on_select_callback = on_select_callback
self.kaomoji_groups = kaomoji_groups
self._init_ui()
self.Bind(wx.EVT_WINDOW_DESTROY, self.on_destroy)
def _init_ui(self):
panel = wx.Panel(self)
main_sizer = wx.BoxSizer(wx.VERTICAL)
# Scrolled content area
self.scroll = wx.ScrolledWindow(panel, size=(380, 420), style=wx.VSCROLL)
self.scroll.SetScrollRate(0, 15)
self.scroll_sizer = wx.BoxSizer(wx.VERTICAL)
# Storage for filtering
self.all_buttons = []
self.group_headers = {}
self.group_containers = {}
# Build kaomoji groups
self._build_kaomoji_groups()
self.scroll.SetSizer(self.scroll_sizer)
main_sizer.Add(self.scroll, 1, wx.EXPAND)
panel.SetSizer(main_sizer)
main_sizer.Fit(panel)
self.SetClientSize(panel.GetBestSize())
# Bind events
self._bind_events(panel)
def _build_kaomoji_groups(self):
dc = wx.ClientDC(self.scroll)
dc.SetFont(self.scroll.GetFont())
self.scroll_sizer.AddSpacer(8)
for group_name, kaomojis in self.kaomoji_groups.items():
if not kaomojis:
continue
# Group header
header = self._create_group_header(group_name)
self.group_headers[group_name] = header
self.scroll_sizer.Add(header, 0, wx.LEFT | wx.TOP | wx.BOTTOM, 12)
# Wrap sizer for buttons
wrap_sizer = wx.WrapSizer(wx.HORIZONTAL)
for kaomoji in kaomojis:
btn = self._create_kaomoji_button(kaomoji, dc, group_name, wrap_sizer)
self.all_buttons.append(btn)
wrap_sizer.Add(btn, 0, wx.ALL, 3)
self.group_containers[group_name] = wrap_sizer
self.scroll_sizer.Add(wrap_sizer, 0, wx.LEFT | wx.RIGHT | wx.BOTTOM, 12)
def _create_group_header(self, group_name):
header = wx.StaticText(self.scroll, label=group_name)
font = header.GetFont()
font.SetWeight(wx.FONTWEIGHT_BOLD)
font.SetPointSize(9)
header.SetFont(font)
return header
def _create_kaomoji_button(self, kaomoji, dc, group_name, wrap_sizer):
text_width, text_height = dc.GetTextExtent(kaomoji)
btn_width = min(max(text_width + 20, 55), 140)
btn_height = text_height + 14
btn = wx.Button(self.scroll, label=kaomoji, size=(btn_width, btn_height))
btn.SetToolTip(f"{kaomoji} - {group_name}")
btn._kaomoji_text = kaomoji.lower()
btn._kaomoji_group = group_name.lower()
btn._group_name = group_name
# Bind click event
btn.Bind(wx.EVT_BUTTON, lambda evt, k=kaomoji: self.on_kaomoji_selected(k))
return btn
def _bind_events(self, panel):
panel.Bind(wx.EVT_CHAR_HOOK, self.on_char_hook)
self.Bind(wx.EVT_KILL_FOCUS, self.on_kill_focus)
def on_kaomoji_selected(self, kaomoji):
try:
if self.on_select_callback:
self.on_select_callback(kaomoji)
except Exception as e:
logger.error(f"Error in kaomoji selection callback: {e}")
finally:
# Safely dismiss the popup
wx.CallAfter(self.safe_dismiss)
def on_search(self, event):
try:
search_text = self.search_ctrl.GetValue().lower().strip()
if not search_text:
self._show_all()
return
visible_groups = set()
for btn in self.all_buttons:
# Match kaomoji text or group name
is_match = (search_text in btn._kaomoji_text or
search_text in btn._kaomoji_group)
btn.Show(is_match)
if is_match:
visible_groups.add(btn._group_name)
# Show/hide group headers
for group_name, header in self.group_headers.items():
header.Show(group_name in visible_groups)
self.scroll.Layout()
self.scroll.FitInside()
except Exception as e:
logger.error(f"Error in kaomoji search: {e}")
def on_clear_search(self, event):
self.search_ctrl.SetValue("")
self._show_all()
def _show_all(self):
for btn in self.all_buttons:
btn.Show()
for header in self.group_headers.values():
header.Show()
self.scroll.Layout()
self.scroll.FitInside()
def on_char_hook(self, event):
if event.GetKeyCode() == wx.WXK_ESCAPE:
self.safe_dismiss()
else:
event.Skip()
def on_kill_focus(self, event):
focused = wx.Window.FindFocus()
if focused and (focused == self.search_ctrl or
focused.GetParent() == self.scroll or
self.scroll.IsDescendant(focused)):
event.Skip()
return
wx.CallLater(100, self.safe_dismiss)
event.Skip()
def on_destroy(self, event):
self.all_buttons.clear()
self.group_headers.clear()
self.group_containers.clear()
event.Skip()
def safe_dismiss(self):
if not self.IsBeingDeleted() and self.IsShown():
try:
self.Dismiss()
except Exception as e:
logger.error(f"Error dismissing popup: {e}")
def show_at_button(self, button):
btn_screen_pos = button.ClientToScreen((0, 0))
popup_size = self.GetSize()
display_size = wx.GetDisplaySize()
# Try to show above the button, otherwise below
if btn_screen_pos.y - popup_size.height > 0:
popup_y = btn_screen_pos.y - popup_size.height
else:
popup_y = btn_screen_pos.y + button.GetSize().height
# Keep popup on screen horizontally
popup_x = max(10, min(btn_screen_pos.x, display_size.x - popup_size.width - 10))
self.Position((popup_x, popup_y), (0, 0))
self.Popup()
class CommandAutocomplete(wx.PopupTransientWindow):
"""Popup window for IRC command autocomplete, similar to Minecraft."""
def __init__(self, parent, commands, on_select_callback):
super().__init__(parent, wx.BORDER_SIMPLE)
self.on_select_callback = on_select_callback
self.commands = commands # List of (command, description) tuples
self.filtered_commands = commands.copy()
self.selected_index = 0
self._init_ui()
self.Bind(wx.EVT_WINDOW_DESTROY, self.on_destroy)
def _init_ui(self):
panel = wx.Panel(self)
main_sizer = wx.BoxSizer(wx.VERTICAL)
# Command list
self.list_ctrl = wx.ListCtrl(panel, style=wx.LC_REPORT | wx.LC_SINGLE_SEL | wx.LC_NO_HEADER)
self.list_ctrl.InsertColumn(0, "Command", width=120)
self.list_ctrl.InsertColumn(1, "Description", width=280)
self._update_list()
# Bind events
self.list_ctrl.Bind(wx.EVT_LIST_ITEM_ACTIVATED, self.on_item_activated)
self.list_ctrl.Bind(wx.EVT_LIST_ITEM_SELECTED, self.on_item_selected)
self.Bind(wx.EVT_CHAR_HOOK, self.on_char_hook)
self.Bind(wx.EVT_KILL_FOCUS, self.on_kill_focus)
main_sizer.Add(self.list_ctrl, 1, wx.EXPAND | wx.ALL, 2)
panel.SetSizer(main_sizer)
main_sizer.Fit(panel)
self.SetClientSize(panel.GetBestSize())
# Select first item
if self.list_ctrl.GetItemCount() > 0:
self.list_ctrl.Select(0)
self.selected_index = 0
def _update_list(self):
"""Update the list with filtered commands."""
self.list_ctrl.DeleteAllItems()
for cmd, desc in self.filtered_commands:
idx = self.list_ctrl.InsertItem(self.list_ctrl.GetItemCount(), cmd)
self.list_ctrl.SetItem(idx, 1, desc)
# Resize to fit content (max 8 items visible)
item_height = 20
max_items = min(8, len(self.filtered_commands))
self.list_ctrl.SetSize((410, item_height * max_items + 4))
self.SetClientSize((410, item_height * max_items + 4))
def filter_commands(self, search_text):
"""Filter commands based on search text."""
search_lower = search_text.lower().strip()
if not search_lower:
self.filtered_commands = self.commands.copy()
else:
self.filtered_commands = [
(cmd, desc) for cmd, desc in self.commands
if cmd.lower().startswith(search_lower)
]
self.selected_index = 0
self._update_list()
if self.list_ctrl.GetItemCount() > 0:
self.list_ctrl.Select(0)
def on_item_activated(self, event):
"""Handle double-click or Enter on item."""
idx = event.GetIndex()
if 0 <= idx < len(self.filtered_commands):
cmd, _ = self.filtered_commands[idx]
if self.on_select_callback:
self.on_select_callback(cmd)
self.safe_dismiss()
def on_item_selected(self, event):
"""Handle item selection."""
self.selected_index = event.GetIndex()
def select_next(self):
"""Select next item."""
if self.list_ctrl.GetItemCount() > 0:
self.selected_index = (self.selected_index + 1) % self.list_ctrl.GetItemCount()
self.list_ctrl.Select(self.selected_index)
self.list_ctrl.EnsureVisible(self.selected_index)
def select_previous(self):
"""Select previous item."""
if self.list_ctrl.GetItemCount() > 0:
self.selected_index = (self.selected_index - 1) % self.list_ctrl.GetItemCount()
self.list_ctrl.Select(self.selected_index)
self.list_ctrl.EnsureVisible(self.selected_index)
def get_selected_command(self):
"""Get the currently selected command."""
if 0 <= self.selected_index < len(self.filtered_commands):
return self.filtered_commands[self.selected_index][0]
return None
def on_char_hook(self, event):
"""Handle keyboard events."""
keycode = event.GetKeyCode()
if keycode == wx.WXK_ESCAPE:
self.safe_dismiss()
elif keycode == wx.WXK_UP:
self.select_previous()
elif keycode == wx.WXK_DOWN:
self.select_next()
elif keycode == wx.WXK_RETURN or keycode == wx.WXK_NUMPAD_ENTER:
cmd = self.get_selected_command()
if cmd and self.on_select_callback:
self.on_select_callback(cmd)
self.safe_dismiss()
else:
event.Skip()
def on_kill_focus(self, event):
"""Handle focus loss."""
focused = wx.Window.FindFocus()
if focused and (focused == self.list_ctrl or self.IsDescendant(focused)):
event.Skip()
return
wx.CallLater(100, self.safe_dismiss)
event.Skip()
def on_destroy(self, event):
event.Skip()
def safe_dismiss(self):
"""Safely dismiss the popup."""
if not self.IsBeingDeleted() and self.IsShown():
try:
self.Dismiss()
except Exception as e:
logger.error(f"Error dismissing command autocomplete: {e}")
def show_at_input(self, input_ctrl):
"""Show popup near the input control."""
input_screen_pos = input_ctrl.ClientToScreen((0, 0))
popup_size = self.GetSize()
display_size = wx.GetDisplaySize()
# Show above the input, otherwise below
if input_screen_pos.y - popup_size.height > 0:
popup_y = input_screen_pos.y - popup_size.height - 2
else:
popup_y = input_screen_pos.y + input_ctrl.GetSize().height + 2
# Keep popup on screen horizontally
popup_x = max(10, min(input_screen_pos.x, display_size.x - popup_size.width - 10))
self.Position((popup_x, popup_y), (0, 0))
self.Popup()
class IRCPanel(wx.Panel): class IRCPanel(wx.Panel):
# IRC commands with descriptions # IRC commands with descriptions
IRC_COMMANDS = [ IRC_COMMANDS = [
@@ -673,7 +328,7 @@ class IRCPanel(wx.Panel):
self.current_popup.Dismiss() self.current_popup.Dismiss()
# Create new popup # Create new popup
self.current_popup = KaomojiPicker( self.current_popup = KaomojiPicker.KaomojiPicker(
self, self,
self.KAOMOJI_GROUPS, self.KAOMOJI_GROUPS,
self.on_kaomoji_insert self.on_kaomoji_insert
@@ -746,7 +401,7 @@ class IRCPanel(wx.Panel):
try: try:
if not self.command_popup or self.command_popup.IsBeingDeleted(): if not self.command_popup or self.command_popup.IsBeingDeleted():
# Create new popup # Create new popup
self.command_popup = CommandAutocomplete( self.command_popup = CommandAutocomplete.CommandAutocomplete( # fuckass python
self, self,
self.IRC_COMMANDS, self.IRC_COMMANDS,
self.on_command_select self.on_command_select

View File

@@ -0,0 +1,103 @@
import wx
import socket
import logging
logger = logging.getLogger(__name__)
class InterfaceSelectDialog(wx.Dialog):
"""Dialog that lets the user pick which local interface the server should bind to."""
def __init__(self, parent, current_host="127.0.0.1"):
super().__init__(parent, title="Select Network Interface", size=(420, 380))
try:
self.SetIcon(parent.GetIcon())
except Exception:
pass
self.selected_host = current_host
panel = wx.Panel(self)
panel.SetBackgroundColour(parent.theme["window_bg"])
main_sizer = wx.BoxSizer(wx.VERTICAL)
info = wx.StaticText(
panel,
label="Choose the Network interface where your server should run:\n"
"You are EXPOSING a Server to your LOCAL Network, this may give away who you are!\n",
)
info.Wrap(380)
main_sizer.Add(info, 0, wx.ALL | wx.EXPAND, 10)
self.interface_list = wx.ListCtrl(panel, style=wx.LC_REPORT | wx.BORDER_SUNKEN)
self.interface_list.InsertColumn(0, "Interface", width=180)
self.interface_list.InsertColumn(1, "Address", width=180)
self.interfaces = self._gather_interfaces()
current_index = 0
for idx, entry in enumerate[tuple[str, str]](self.interfaces):
name, address = entry
self.interface_list.InsertItem(idx, name)
self.interface_list.SetItem(idx, 1, address)
if address == current_host:
current_index = idx
self.interface_list.Select(current_index)
self.interface_list.EnsureVisible(current_index)
if self.interfaces:
self.selected_host = self.interfaces[current_index][1]
self.interface_list.Bind(wx.EVT_LIST_ITEM_SELECTED, self.on_select)
self.interface_list.Bind(wx.EVT_LIST_ITEM_ACTIVATED, self.on_activate)
main_sizer.Add(self.interface_list, 1, wx.ALL | wx.EXPAND, 10)
button_bar = wx.StdDialogButtonSizer()
ok_btn = wx.Button(panel, wx.ID_OK)
cancel_btn = wx.Button(panel, wx.ID_CANCEL)
button_bar.AddButton(ok_btn)
button_bar.AddButton(cancel_btn)
button_bar.Realize()
main_sizer.Add(button_bar, 0, wx.ALL | wx.EXPAND, 10)
ok_btn.Bind(wx.EVT_BUTTON, self.on_ok)
panel.SetSizer(main_sizer)
def on_select(self, event):
index = event.GetIndex()
_, address = self.interfaces[index]
self.selected_host = address
def _gather_interfaces(self):
entries = [
("Loopback only", "127.0.0.1"),
("All interfaces", "0.0.0.0"),
]
try:
import psutil
seen = {addr for _, addr in entries}
for name, addrs in psutil.net_if_addrs().items():
for addr in addrs:
if addr.family == socket.AF_INET and addr.address not in seen:
label = f"{name}"
entries.append((label, addr.address))
seen.add(addr.address)
except Exception as e:
logger.warning(f"Unable to enumerate network interfaces: {e}")
return entries
def get_selected_host(self):
return self.selected_host
def on_activate(self, event):
self.on_select(event)
self.EndModal(wx.ID_OK)
def on_ok(self, event):
index = self.interface_list.GetFirstSelected()
if index == -1 and self.interfaces:
wx.MessageBox("Select an interface before starting the server.", "No Interface Selected", wx.OK | wx.ICON_INFORMATION)
return
if index != -1:
_, address = self.interfaces[index]
self.selected_host = address
event.Skip()

197
src/KaomojiPicker.py Normal file
View File

@@ -0,0 +1,197 @@
import logging
import traceback
import wx
import wx.adv
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class KaomojiPicker(wx.PopupTransientWindow):
def __init__(self, parent, kaomoji_groups, on_select_callback):
super().__init__(parent, wx.BORDER_SIMPLE)
self.on_select_callback = on_select_callback
self.kaomoji_groups = kaomoji_groups
self._init_ui()
self.Bind(wx.EVT_WINDOW_DESTROY, self.on_destroy)
def _init_ui(self):
panel = wx.Panel(self)
main_sizer = wx.BoxSizer(wx.VERTICAL)
# Scrolled content area
self.scroll = wx.ScrolledWindow(panel, size=(380, 420), style=wx.VSCROLL)
self.scroll.SetScrollRate(0, 15)
self.scroll_sizer = wx.BoxSizer(wx.VERTICAL)
# Storage for filtering
self.all_buttons = []
self.group_headers = {}
self.group_containers = {}
# Build kaomoji groups
self._build_kaomoji_groups()
self.scroll.SetSizer(self.scroll_sizer)
main_sizer.Add(self.scroll, 1, wx.EXPAND)
panel.SetSizer(main_sizer)
main_sizer.Fit(panel)
self.SetClientSize(panel.GetBestSize())
# Bind events
self._bind_events(panel)
def _build_kaomoji_groups(self):
dc = wx.ClientDC(self.scroll)
dc.SetFont(self.scroll.GetFont())
self.scroll_sizer.AddSpacer(8)
for group_name, kaomojis in self.kaomoji_groups.items():
if not kaomojis:
continue
# Group header
header = self._create_group_header(group_name)
self.group_headers[group_name] = header
self.scroll_sizer.Add(header, 0, wx.LEFT | wx.TOP | wx.BOTTOM, 12)
# Wrap sizer for buttons
wrap_sizer = wx.WrapSizer(wx.HORIZONTAL)
for kaomoji in kaomojis:
btn = self._create_kaomoji_button(kaomoji, dc, group_name, wrap_sizer)
self.all_buttons.append(btn)
wrap_sizer.Add(btn, 0, wx.ALL, 3)
self.group_containers[group_name] = wrap_sizer
self.scroll_sizer.Add(wrap_sizer, 0, wx.LEFT | wx.RIGHT | wx.BOTTOM, 12)
def _create_group_header(self, group_name):
header = wx.StaticText(self.scroll, label=group_name)
font = header.GetFont()
font.SetWeight(wx.FONTWEIGHT_BOLD)
font.SetPointSize(9)
header.SetFont(font)
return header
def _create_kaomoji_button(self, kaomoji, dc, group_name, wrap_sizer):
text_width, text_height = dc.GetTextExtent(kaomoji)
btn_width = min(max(text_width + 20, 55), 140)
btn_height = text_height + 14
btn = wx.Button(self.scroll, label=kaomoji, size=(btn_width, btn_height))
btn.SetToolTip(f"{kaomoji} - {group_name}")
btn._kaomoji_text = kaomoji.lower()
btn._kaomoji_group = group_name.lower()
btn._group_name = group_name
# Bind click event
btn.Bind(wx.EVT_BUTTON, lambda evt, k=kaomoji: self.on_kaomoji_selected(k))
return btn
def _bind_events(self, panel):
panel.Bind(wx.EVT_CHAR_HOOK, self.on_char_hook)
self.Bind(wx.EVT_KILL_FOCUS, self.on_kill_focus)
def on_kaomoji_selected(self, kaomoji):
try:
if self.on_select_callback:
self.on_select_callback(kaomoji)
except Exception as e:
logger.error(f"Error in kaomoji selection callback: {e}")
finally:
# Safely dismiss the popup
wx.CallAfter(self.safe_dismiss)
def on_search(self, event):
try:
search_text = self.search_ctrl.GetValue().lower().strip()
if not search_text:
self._show_all()
return
visible_groups = set()
for btn in self.all_buttons:
# Match kaomoji text or group name
is_match = (search_text in btn._kaomoji_text or
search_text in btn._kaomoji_group)
btn.Show(is_match)
if is_match:
visible_groups.add(btn._group_name)
# Show/hide group headers
for group_name, header in self.group_headers.items():
header.Show(group_name in visible_groups)
self.scroll.Layout()
self.scroll.FitInside()
except Exception as e:
logger.error(f"Error in kaomoji search: {e}")
def on_clear_search(self, event):
self.search_ctrl.SetValue("")
self._show_all()
def _show_all(self):
for btn in self.all_buttons:
btn.Show()
for header in self.group_headers.values():
header.Show()
self.scroll.Layout()
self.scroll.FitInside()
def on_char_hook(self, event):
if event.GetKeyCode() == wx.WXK_ESCAPE:
self.safe_dismiss()
else:
event.Skip()
def on_kill_focus(self, event):
focused = wx.Window.FindFocus()
if focused and (focused == self.search_ctrl or
focused.GetParent() == self.scroll or
self.scroll.IsDescendant(focused)):
event.Skip()
return
wx.CallLater(100, self.safe_dismiss)
event.Skip()
def on_destroy(self, event):
self.all_buttons.clear()
self.group_headers.clear()
self.group_containers.clear()
event.Skip()
def safe_dismiss(self):
if not self.IsBeingDeleted() and self.IsShown():
try:
self.Dismiss()
except Exception as e:
logger.error(f"Error dismissing popup: {e}")
def show_at_button(self, button):
btn_screen_pos = button.ClientToScreen((0, 0))
popup_size = self.GetSize()
display_size = wx.GetDisplaySize()
# Try to show above the button, otherwise below
if btn_screen_pos.y - popup_size.height > 0:
popup_y = btn_screen_pos.y - popup_size.height
else:
popup_y = btn_screen_pos.y + button.GetSize().height
# Keep popup on screen horizontally
popup_x = max(10, min(btn_screen_pos.x, display_size.x - popup_size.width - 10))
self.Position((popup_x, popup_y), (0, 0))
self.Popup()

View File

@@ -1,18 +1,11 @@
import ipaddress import ipaddress
import logging import logging
import socket
import threading
from typing import Callable, Iterable, List, Optional from typing import Callable, Iterable, List, Optional
from irc import server as irc_server from irc import server as irc_server
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def _default_log_callback(message: str, color=None, bold: bool = False):
"""Fallback logger when UI callback is not available."""
logger.info(message)
class LocalOnlyIRCServer(irc_server.IRCServer): class LocalOnlyIRCServer(irc_server.IRCServer):
"""IRC server that only accepts connections from local/LAN addresses.""" """IRC server that only accepts connections from local/LAN addresses."""
@@ -44,172 +37,6 @@ class LocalOnlyIRCServer(irc_server.IRCServer):
return False return False
class LocalServerManager:
"""Manages the background IRC server lifecycle."""
DEFAULT_CHANNELS = ["#lobby"]
DEFAULT_ALLOWED_NETWORKS = [
ipaddress.ip_network("127.0.0.0/8"), # Loopback
ipaddress.ip_network("10.0.0.0/8"), # RFC1918
ipaddress.ip_network("172.16.0.0/12"), # RFC1918
ipaddress.ip_network("192.168.0.0/16"), # RFC1918
ipaddress.ip_network("169.254.0.0/16"), # Link-local
]
def __init__(
self,
log_callback: Callable[[str, Optional[object], bool], None] = _default_log_callback,
listen_host: str = "0.0.0.0",
listen_port: int = 6667,
):
self.log_callback = log_callback or _default_log_callback
self.listen_host = listen_host
self.listen_port = listen_port
self.allowed_networks = list(self.DEFAULT_ALLOWED_NETWORKS)
self._channels = list(self.DEFAULT_CHANNELS)
self._server: Optional[LocalOnlyIRCServer] = None
self._thread: Optional[threading.Thread] = None
self._lock = threading.RLock()
self._running = threading.Event()
self._ready = threading.Event()
self._error: Optional[Exception] = None
# Public API ---------------------------------------------------------
def start(self, timeout: float = 5.0):
"""Start the background IRC server."""
with self._lock:
if self._running.is_set():
raise RuntimeError("Local IRC server is already running.")
self._running.set()
self._ready.clear()
self._error = None
self._thread = threading.Thread(
target=self._serve_forever,
name="Local-IRC-Server",
daemon=True,
)
self._thread.start()
if not self._ready.wait(timeout):
self._running.clear()
raise TimeoutError("Local IRC server failed to start in time.")
if self._error:
raise self._error
def stop(self, timeout: float = 5.0):
"""Stop the IRC server if it is running."""
with self._lock:
if not self._running.is_set():
return
server = self._server
thread = self._thread
if server:
server.shutdown()
server.server_close()
if thread:
thread.join(timeout=timeout)
with self._lock:
self._server = None
self._thread = None
self._running.clear()
self._ready.clear()
def is_running(self) -> bool:
return self._running.is_set()
def set_listen_host(self, host: str):
"""Update the bind address. Local server must be stopped."""
with self._lock:
if self._running.is_set():
raise RuntimeError("Stop the server before changing the interface.")
self.listen_host = host
self._log(f"Local server interface set to {host}.")
def get_channels(self) -> List[str]:
with self._lock:
return list(self._channels)
def set_channels(self, channels: Iterable[str]):
cleaned = self._sanitize_channels(channels)
with self._lock:
self._channels = cleaned or list(self.DEFAULT_CHANNELS)
if self.is_running():
self._log(
"Channel list updated. Restart local server to apply changes.",
bold=True,
)
# Internal helpers ---------------------------------------------------
def _serve_forever(self):
try:
server = LocalOnlyIRCServer(
(self.listen_host, self.listen_port),
irc_server.IRCClient,
self.allowed_networks,
blocked_callback=lambda ip: self._log(
f"Blocked connection attempt from {ip}", bold=False
),
)
server.servername = socket.gethostname() or "wxirc-local"
with self._lock:
self._server = server
seed_channels = self._channels or list(self.DEFAULT_CHANNELS)
for channel in seed_channels:
server.channels.setdefault(channel, irc_server.IRCChannel(channel))
self._log(
f"Local IRC server listening on {self.listen_host}:{self.listen_port}",
bold=True,
)
self._ready.set()
server.serve_forever()
except Exception as exc:
logger.exception("Local IRC server failed: %s", exc)
self._error = exc
self._ready.set()
self._log(f"Local server error: {exc}", bold=True)
finally:
self._running.clear()
self._log("Local IRC server stopped.")
def _sanitize_channels(self, channels: Iterable[str]) -> List[str]:
unique = []
seen = set()
for channel in channels:
if not channel:
continue
name = channel.strip()
if not name.startswith("#"):
name = f"#{name}"
if not self._is_valid_channel(name):
continue
if name.lower() not in seen:
unique.append(name)
seen.add(name.lower())
return unique
@staticmethod
def _is_valid_channel(name: str) -> bool:
if len(name) < 2:
return False
allowed = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_#"
return all(ch in allowed for ch in name)
def _log(self, message: str, color=None, bold: bool = False):
try:
self.log_callback(message, color, bold)
except Exception:
logger.info(message)
if __name__ == "__main__": if __name__ == "__main__":
import argparse import argparse
@@ -246,7 +73,7 @@ if __name__ == "__main__":
) )
# Initialize the server manager # Initialize the server manager
manager = LocalServerManager( manager = LocalServerManager( # pyright: ignore[reportUndefinedVariable]
listen_host=args.host, listen_host=args.host,
listen_port=args.port listen_port=args.port
) )

179
src/LocalServerManager.py Normal file
View File

@@ -0,0 +1,179 @@
import ipaddress
import socket
import threading
import logging
from typing import Callable, Iterable, List, Optional
from LocalServer import LocalOnlyIRCServer
from irc import server as irc_server
logger = logging.getLogger(__name__)
def _default_log_callback(message: str, color=None, bold: bool = False):
"""Fallback logger when UI callback is not available."""
logger.info(message)
class LocalServerManager:
"""Manages the background IRC server lifecycle."""
DEFAULT_CHANNELS = ["#lobby"]
DEFAULT_ALLOWED_NETWORKS = [
ipaddress.ip_network("127.0.0.0/8"), # Loopback
ipaddress.ip_network("10.0.0.0/8"), # RFC1918
ipaddress.ip_network("172.16.0.0/12"), # RFC1918
ipaddress.ip_network("192.168.0.0/16"), # RFC1918
ipaddress.ip_network("169.254.0.0/16"), # Link-local
]
def __init__(
self,
log_callback: Callable[[str, Optional[object], bool], None] = _default_log_callback,
listen_host: str = "0.0.0.0",
listen_port: int = 6667,
):
self.log_callback = log_callback or _default_log_callback
self.listen_host = listen_host
self.listen_port = listen_port
self.allowed_networks = list(self.DEFAULT_ALLOWED_NETWORKS)
self._channels = list(self.DEFAULT_CHANNELS)
self._server = None
self._thread = None
self._lock = threading.RLock()
self._running = threading.Event()
self._ready = threading.Event()
self._error: Optional[Exception] = None
# Public API ---------------------------------------------------------
def start(self, timeout: float = 5.0):
"""Start the background IRC server."""
with self._lock:
if self._running.is_set():
raise RuntimeError("Local IRC server is already running.")
self._running.set()
self._ready.clear()
self._error = None
self._thread = threading.Thread(
target=self._serve_forever,
name="Local-IRC-Server",
daemon=True,
)
self._thread.start()
if not self._ready.wait(timeout):
self._running.clear()
raise TimeoutError("Local IRC server failed to start in time.")
if self._error:
raise self._error
def stop(self, timeout: float = 5.0):
"""Stop the IRC server if it is running."""
with self._lock:
if not self._running.is_set():
return
server = self._server
thread = self._thread
if server:
server.shutdown()
server.server_close()
if thread:
thread.join(timeout=timeout)
with self._lock:
self._server = None
self._thread = None
self._running.clear()
self._ready.clear()
def is_running(self) -> bool:
return self._running.is_set()
def set_listen_host(self, host: str):
"""Update the bind address. Local server must be stopped."""
with self._lock:
if self._running.is_set():
raise RuntimeError("Stop the server before changing the interface.")
self.listen_host = host
self._log(f"Local server interface set to {host}.")
def get_channels(self) -> List[str]:
with self._lock:
return list(self._channels)
def set_channels(self, channels: Iterable[str]):
cleaned = self._sanitize_channels(channels)
with self._lock:
self._channels = cleaned or list(self.DEFAULT_CHANNELS)
if self.is_running():
self._log(
"Channel list updated. Restart local server to apply changes.",
bold=True,
)
# Internal helpers ---------------------------------------------------
def _serve_forever(self):
try:
server = LocalOnlyIRCServer(
(self.listen_host, self.listen_port),
irc_server.IRCClient,
self.allowed_networks,
blocked_callback=lambda ip: self._log(
f"Blocked connection attempt from {ip}", bold=False
),
)
server.servername = socket.gethostname() or "wxirc-local"
with self._lock:
self._server = server
seed_channels = self._channels or list(self.DEFAULT_CHANNELS)
for channel in seed_channels:
server.channels.setdefault(channel, irc_server.IRCChannel(channel))
self._log(
f"Local IRC server listening on {self.listen_host}:{self.listen_port}",
bold=True,
)
self._ready.set()
server.serve_forever()
except Exception as exc:
logger.exception("Local IRC server failed: %s", exc)
self._error = exc
self._ready.set()
self._log(f"Local server error: {exc}", bold=True)
finally:
self._running.clear()
self._log("Local IRC server stopped.")
def _sanitize_channels(self, channels: Iterable[str]) -> List[str]:
unique = []
seen = set()
for channel in channels:
if not channel:
continue
name = channel.strip()
if not name.startswith("#"):
name = f"#{name}"
if not self._is_valid_channel(name):
continue
if name.lower() not in seen:
unique.append(name)
seen.add(name.lower())
return unique
@staticmethod
def _is_valid_channel(name: str) -> bool:
if len(name) < 2:
return False
allowed = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_#"
return all(ch in allowed for ch in name)
def _log(self, message: str, color=None, bold: bool = False):
try:
self.log_callback(message, color, bold)
except Exception:
logger.info(message)

104
src/ManageChannelsDialog.py Normal file
View File

@@ -0,0 +1,104 @@
import wx
import wx.adv
class ManageChannelsDialog(wx.Dialog):
"""Simple dialog for curating the local server channel allowlist."""
def __init__(self, parent, channels):
super().__init__(parent, title="Manage Local Channels", size=(360, 420))
try:
self.SetIcon(parent.GetIcon())
except Exception:
pass
panel = wx.Panel(self)
panel.SetBackgroundColour(parent.theme["window_bg"])
main_sizer = wx.BoxSizer(wx.VERTICAL)
info = wx.StaticText(
panel,
label="Channels are shared with anyone on your LAN who joins the built-in server.",
)
info.Wrap(320)
main_sizer.Add(info, 0, wx.ALL | wx.EXPAND, 8)
self.list_box = wx.ListBox(panel)
for channel in channels:
self.list_box.Append(channel)
main_sizer.Add(self.list_box, 1, wx.ALL | wx.EXPAND, 8)
input_sizer = wx.BoxSizer(wx.HORIZONTAL)
self.channel_input = wx.TextCtrl(panel, style=wx.TE_PROCESS_ENTER)
self.channel_input.SetHint("#channel-name")
add_btn = wx.Button(panel, label="Add")
input_sizer.Add(self.channel_input, 1, wx.RIGHT, 4)
input_sizer.Add(add_btn, 0)
main_sizer.Add(input_sizer, 0, wx.EXPAND | wx.LEFT | wx.RIGHT, 8)
remove_btn = wx.Button(panel, label="Remove Selected")
reset_btn = wx.Button(panel, label="Reset to #lobby")
btn_sizer = wx.BoxSizer(wx.HORIZONTAL)
btn_sizer.Add(remove_btn, 1, wx.RIGHT, 4)
btn_sizer.Add(reset_btn, 1)
main_sizer.Add(btn_sizer, 0, wx.EXPAND | wx.LEFT | wx.RIGHT | wx.TOP, 8)
button_bar = wx.StdDialogButtonSizer()
ok_btn = wx.Button(panel, wx.ID_OK)
cancel_btn = wx.Button(panel, wx.ID_CANCEL)
button_bar.AddButton(ok_btn)
button_bar.AddButton(cancel_btn)
button_bar.Realize()
main_sizer.Add(button_bar, 0, wx.ALL | wx.EXPAND, 8)
panel.SetSizer(main_sizer)
# Bindings
add_btn.Bind(wx.EVT_BUTTON, self.on_add)
remove_btn.Bind(wx.EVT_BUTTON, self.on_remove)
reset_btn.Bind(wx.EVT_BUTTON, self.on_reset)
self.channel_input.Bind(wx.EVT_TEXT_ENTER, self.on_add)
ok_btn.Bind(wx.EVT_BUTTON, self.on_ok)
def get_channels(self):
return [self.list_box.GetString(i) for i in range(self.list_box.GetCount())]
def on_add(self, event):
value = self.channel_input.GetValue().strip()
if not value:
return
if not value.startswith('#'):
value = f"#{value}"
if not self._is_valid_channel(value):
wx.MessageBox(
"Channel names may contain letters, numbers, -, and _ only.",
"Invalid Channel",
wx.OK | wx.ICON_WARNING,
)
return
if self.list_box.FindString(value) != wx.NOT_FOUND:
wx.MessageBox("Channel already exists.", "Duplicate Channel", wx.OK | wx.ICON_INFORMATION)
return
self.list_box.Append(value)
self.channel_input.Clear()
def on_remove(self, event):
selection = self.list_box.GetSelection()
if selection != wx.NOT_FOUND:
self.list_box.Delete(selection)
def on_reset(self, event):
self.list_box.Clear()
self.list_box.Append("#lobby")
def on_ok(self, event):
if self.list_box.GetCount() == 0:
wx.MessageBox("Add at least one channel before saving.", "No Channels", wx.OK | wx.ICON_INFORMATION)
return
event.Skip()
@staticmethod
def _is_valid_channel(name):
if len(name) < 2:
return False
allowed = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_#"
return all(ch in allowed for ch in name)

120
src/ScanHandler.py Normal file
View File

@@ -0,0 +1,120 @@
from concurrent.futures import ThreadPoolExecutor, as_completed
import socket
import threading
import psutil
import ipaddress
import logging
logger = logging.getLogger(__name__)
class ScanHandler:
"""Fast local network IRC scanner with minimal network overhead."""
def __init__(self, timeout=0.35, max_workers=64):
self.timeout = timeout
self.max_workers = max_workers
self._stop_event = threading.Event()
self._thread = None
self.results = []
self.total_hosts = 0
def detect_networks(self):
"""Return private IPv4 networks discovered on local interfaces."""
networks = []
try:
for iface, addrs in psutil.net_if_addrs().items():
for addr in addrs:
if addr.family != socket.AF_INET or not addr.netmask:
continue
try:
interface = ipaddress.IPv4Interface(f"{addr.address}/{addr.netmask}")
except ValueError:
continue
if not interface.network.is_private:
continue
network = self._cap_network(interface)
label = f"{iface} : {interface.ip} through {network.with_prefixlen} range"
networks.append({"label": label, "cidr": str(network)})
except Exception as exc:
logger.error("Failed to enumerate interfaces: %s", exc)
if not networks:
default_net = "192.168.1.0/24"
label = f"Default guess : {default_net}"
networks.append({"label": label, "cidr": default_net})
return networks
def start_scan(self, network_cidr, ports, progress_cb=None, result_cb=None, done_cb=None):
"""Launch the threaded scan."""
if self._thread and self._thread.is_alive():
return False
try:
network = ipaddress.ip_network(network_cidr, strict=False)
except ValueError:
return False
hosts = [str(host) for host in network.hosts()]
if not hosts:
hosts = [str(network.network_address)]
self.total_hosts = len(hosts)
self.results.clear()
self._stop_event.clear()
def _worker():
logger.info("Starting IRC scan across %s hosts (%s)", len(hosts), network_cidr)
scanned = 0
try:
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = {executor.submit(self._probe_host, host, ports): host for host in hosts}
for future in as_completed(futures):
if self._stop_event.is_set():
break
scanned += 1
server_info = future.result()
if server_info:
self.results.append(server_info)
if result_cb:
result_cb(server_info)
if progress_cb:
progress_cb(scanned, self.total_hosts)
except Exception as exc:
logger.error("Scan failure: %s", exc)
finally:
if done_cb:
done_cb(self.results)
logger.info("IRC scan finished (%s discovered)", len(self.results))
self._thread = threading.Thread(target=_worker, name="IRC-Scan", daemon=True)
self._thread.start()
return True
def stop_scan(self):
self._stop_event.set()
def _probe_host(self, host, ports):
if self._stop_event.is_set():
return None
for port in ports:
try:
with socket.create_connection((host, port), timeout=self.timeout) as sock:
sock.settimeout(0.2)
banner = ""
try:
chunk = sock.recv(256)
if chunk:
banner = chunk.decode(errors="ignore").strip()
except socket.timeout:
banner = "IRC server (silent banner)"
except OSError:
pass
return {"address": host, "port": port, "banner": banner or "IRC server detected"}
except (socket.timeout, ConnectionRefusedError, OSError):
continue
return None
@staticmethod
def _cap_network(interface):
"""Cap huge networks to /24 to keep scans lightweight."""
if interface.network.prefixlen >= 24:
return interface.network
return ipaddress.ip_network(f"{interface.ip}/24", strict=False)

View File

@@ -1,299 +1,19 @@
import ipaddress
import logging import logging
import socket import ScanHandler
import threading import ScanWizardIntroPage
from concurrent.futures import ThreadPoolExecutor, as_completed import ScanWizardResultsPage
import psutil
import wx import wx
import wx.adv as adv import wx.adv as adv
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class ScanHandler:
"""Fast local network IRC scanner with minimal network overhead."""
def __init__(self, timeout=0.35, max_workers=64):
self.timeout = timeout
self.max_workers = max_workers
self._stop_event = threading.Event()
self._thread = None
self.results = []
self.total_hosts = 0
def detect_networks(self):
"""Return private IPv4 networks discovered on local interfaces."""
networks = []
try:
for iface, addrs in psutil.net_if_addrs().items():
for addr in addrs:
if addr.family != socket.AF_INET or not addr.netmask:
continue
try:
interface = ipaddress.IPv4Interface(f"{addr.address}/{addr.netmask}")
except ValueError:
continue
if not interface.network.is_private:
continue
network = self._cap_network(interface)
label = f"{iface} : {interface.ip} through {network.with_prefixlen} range"
networks.append({"label": label, "cidr": str(network)})
except Exception as exc:
logger.error("Failed to enumerate interfaces: %s", exc)
if not networks:
default_net = "192.168.1.0/24"
label = f"Default guess : {default_net}"
networks.append({"label": label, "cidr": default_net})
return networks
def start_scan(self, network_cidr, ports, progress_cb=None, result_cb=None, done_cb=None):
"""Launch the threaded scan."""
if self._thread and self._thread.is_alive():
return False
try:
network = ipaddress.ip_network(network_cidr, strict=False)
except ValueError:
return False
hosts = [str(host) for host in network.hosts()]
if not hosts:
hosts = [str(network.network_address)]
self.total_hosts = len(hosts)
self.results.clear()
self._stop_event.clear()
def _worker():
logger.info("Starting IRC scan across %s hosts (%s)", len(hosts), network_cidr)
scanned = 0
try:
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = {executor.submit(self._probe_host, host, ports): host for host in hosts}
for future in as_completed(futures):
if self._stop_event.is_set():
break
scanned += 1
server_info = future.result()
if server_info:
self.results.append(server_info)
if result_cb:
result_cb(server_info)
if progress_cb:
progress_cb(scanned, self.total_hosts)
except Exception as exc:
logger.error("Scan failure: %s", exc)
finally:
if done_cb:
done_cb(self.results)
logger.info("IRC scan finished (%s discovered)", len(self.results))
self._thread = threading.Thread(target=_worker, name="IRC-Scan", daemon=True)
self._thread.start()
return True
def stop_scan(self):
self._stop_event.set()
def _probe_host(self, host, ports):
if self._stop_event.is_set():
return None
for port in ports:
try:
with socket.create_connection((host, port), timeout=self.timeout) as sock:
sock.settimeout(0.2)
banner = ""
try:
chunk = sock.recv(256)
if chunk:
banner = chunk.decode(errors="ignore").strip()
except socket.timeout:
banner = "IRC server (silent banner)"
except OSError:
pass
return {"address": host, "port": port, "banner": banner or "IRC server detected"}
except (socket.timeout, ConnectionRefusedError, OSError):
continue
return None
@staticmethod
def _cap_network(interface):
"""Cap huge networks to /24 to keep scans lightweight."""
if interface.network.prefixlen >= 24:
return interface.network
return ipaddress.ip_network(f"{interface.ip}/24", strict=False)
class ScanWizardIntroPage(adv.WizardPageSimple):
def __init__(self, parent, scan_handler):
super().__init__(parent)
self.scan_handler = scan_handler
self.networks = self.scan_handler.detect_networks()
self._build_ui()
def _build_ui(self):
sizer = wx.BoxSizer(wx.VERTICAL)
intro = wx.StaticText(self, label="Scan the local network for open IRC servers.\n\n Security Warning: This scan may reveal information about your device, and may make you vulnerable to attacks.")
intro.Wrap(420)
sizer.Add(intro, 0, wx.ALL, 5)
sizer.Add(wx.StaticText(self, label="Network"), 0, wx.TOP | wx.LEFT, 8)
labels = [net["label"] for net in self.networks]
self.network_choice = wx.Choice(self, choices=labels)
if labels:
self.network_choice.SetSelection(0)
sizer.Add(self.network_choice, 0, wx.EXPAND | wx.ALL, 5)
sizer.Add(wx.StaticText(self, label="Ports (comma separated)"), 0, wx.TOP | wx.LEFT, 8)
self.port_ctrl = wx.TextCtrl(self, value="6667,6697")
sizer.Add(self.port_ctrl, 0, wx.EXPAND | wx.ALL, 5)
self.SetSizer(sizer)
def get_scan_params(self):
selection = self.network_choice.GetSelection()
if selection == wx.NOT_FOUND:
wx.MessageBox("Select a network to scan.", "Missing selection", wx.OK | wx.ICON_WARNING)
return None
cidr = self.networks[selection]["cidr"]
raw_ports = self.port_ctrl.GetValue().split(",")
ports = []
for raw in raw_ports:
raw = raw.strip()
if not raw:
continue
try:
port = int(raw)
if 1 <= port <= 65535:
ports.append(port)
except ValueError:
continue
if not ports:
wx.MessageBox("Enter at least one valid TCP port.", "Invalid ports", wx.OK | wx.ICON_WARNING)
return None
try:
network = ipaddress.ip_network(cidr, strict=False)
except ValueError:
wx.MessageBox("Invalid network selection.", "Network error", wx.OK | wx.ICON_ERROR)
return None
host_count = max(network.num_addresses - (2 if network.version == 4 and network.prefixlen <= 30 else 0), 1)
return {"cidr": str(network), "ports": ports, "host_count": host_count}
class ScanWizardResultsPage(adv.WizardPageSimple):
def __init__(self, parent, scan_handler, main_frame):
super().__init__(parent)
self.scan_handler = scan_handler
self.main_frame = main_frame
self.discovered = []
self._build_ui()
def _build_ui(self):
sizer = wx.BoxSizer(wx.VERTICAL)
self.summary = wx.StaticText(self, label="Waiting to start…")
sizer.Add(self.summary, 0, wx.ALL, 5)
self.gauge = wx.Gauge(self, range=100, style=wx.GA_SMOOTH)
sizer.Add(self.gauge, 0, wx.EXPAND | wx.ALL, 5)
self.results_list = wx.ListCtrl(self, style=wx.LC_REPORT | wx.BORDER_SUNKEN)
self.results_list.InsertColumn(0, "Address", width=140)
self.results_list.InsertColumn(1, "Port", width=60)
self.results_list.InsertColumn(2, "Details", width=260)
self.results_list.Bind(wx.EVT_LIST_ITEM_SELECTED, self._toggle_buttons)
self.results_list.Bind(wx.EVT_LIST_ITEM_DESELECTED, self._toggle_buttons)
sizer.Add(self.results_list, 1, wx.EXPAND | wx.ALL, 5)
btn_row = wx.BoxSizer(wx.HORIZONTAL)
self.quick_connect_btn = wx.Button(self, label="Quick Connect")
self.quick_connect_btn.Disable()
self.quick_connect_btn.Bind(wx.EVT_BUTTON, self.on_quick_connect)
btn_row.Add(self.quick_connect_btn, 0, wx.RIGHT, 5)
self.rescan_btn = wx.Button(self, label="Rescan")
self.rescan_btn.Bind(wx.EVT_BUTTON, self.on_rescan)
btn_row.Add(self.rescan_btn, 0)
sizer.Add(btn_row, 0, wx.ALL | wx.ALIGN_RIGHT, 5)
self.SetSizer(sizer)
def prepare_for_scan(self, params, start_callback):
self.results_list.DeleteAllItems()
self.discovered = []
self.summary.SetLabel(f"Scanning {params['cidr']} on ports {', '.join(map(str, params['ports']))}")
self.gauge.SetRange(max(params["host_count"], 1))
self.gauge.SetValue(0)
self.quick_connect_btn.Disable()
start_callback(params)
def on_scan_progress(self, scanned, total):
try:
total = max(total, 1)
self.gauge.SetRange(total)
self.gauge.SetValue(min(scanned, total))
self.summary.SetLabel(f"Scanning… {scanned}/{total} hosts checked")
except RuntimeError:
# C++ SHIT
logger.debug("Scan progress update after controls destroyed; ignoring")
def on_scan_result(self, server_info):
"""Handle a single discovered server row."""
try:
idx = self.results_list.InsertItem(self.results_list.GetItemCount(), server_info["address"])
self.results_list.SetItem(idx, 1, str(server_info["port"]))
self.results_list.SetItem(idx, 2, server_info.get("banner", "IRC server detected"))
self.discovered.append(server_info)
self.summary.SetLabel(
f"Found {len(self.discovered)} {'server' if len(self.discovered) == 1 else 'servers'}"
)
except RuntimeError:
logger.debug("Scan result update after controls destroyed; ignoring")
def on_scan_complete(self, results):
"""Final scan completion callback."""
try:
if results:
self.summary.SetLabel(
f"Scan complete : {len(results)} "
f"{'server' if len(results) == 1 else 'servers'} ready."
)
else:
self.summary.SetLabel("Scan complete : no IRC servers discovered.")
self._toggle_buttons()
except RuntimeError:
logger.debug("Scan completion update after controls destroyed; ignoring")
def on_quick_connect(self, event):
row = self.results_list.GetFirstSelected()
if row == -1:
return
server = self.results_list.GetItemText(row, 0)
port = int(self.results_list.GetItemText(row, 1))
if self.main_frame.quick_connect(server, port):
self.GetParent().EndModal(wx.ID_OK)
def on_rescan(self, event):
wizard = self.GetParent()
wizard.ShowPage(wizard.intro_page)
def _toggle_buttons(self, event=None):
has_selection = self.results_list.GetFirstSelected() != -1
if has_selection:
self.quick_connect_btn.Enable()
else:
self.quick_connect_btn.Disable()
class ScanWizardDialog(adv.Wizard): class ScanWizardDialog(adv.Wizard):
"""Wizard that drives the ScanHandler workflow."""
def __init__(self, parent): def __init__(self, parent):
super().__init__(parent, title="wxScan") super().__init__(parent, title="wxScan")
self.scan_handler = ScanHandler() self.scan_handler = ScanHandler.ScanHandler()
self.intro_page = ScanWizardIntroPage(self, self.scan_handler) self.intro_page = ScanWizardIntroPage.ScanWizardIntroPage(self, self.scan_handler)
self.results_page = ScanWizardResultsPage(self, self.scan_handler, parent) self.results_page = ScanWizardResultsPage.ScanWizardResultsPage(self, self.scan_handler, parent)
self._chain_pages() self._chain_pages()
self.Bind(adv.EVT_WIZARD_PAGE_CHANGING, self.on_page_changing) self.Bind(adv.EVT_WIZARD_PAGE_CHANGING, self.on_page_changing)
self.Bind(adv.EVT_WIZARD_CANCEL, self.on_cancel) self.Bind(adv.EVT_WIZARD_CANCEL, self.on_cancel)

View File

@@ -0,0 +1,59 @@
import wx
import wx.adv as adv
import ipaddress
class ScanWizardIntroPage(adv.WizardPageSimple):
def __init__(self, parent, scan_handler):
super().__init__(parent)
self.scan_handler = scan_handler
self.networks = self.scan_handler.detect_networks()
self._build_ui()
def _build_ui(self):
sizer = wx.BoxSizer(wx.VERTICAL)
intro = wx.StaticText(self, label="Scan the local network for open IRC servers.\n\n Security Warning: This scan may reveal information about your device, and may make you vulnerable to attacks.")
intro.Wrap(420)
sizer.Add(intro, 0, wx.ALL, 5)
sizer.Add(wx.StaticText(self, label="Network"), 0, wx.TOP | wx.LEFT, 8)
labels = [net["label"] for net in self.networks]
self.network_choice = wx.Choice(self, choices=labels)
if labels:
self.network_choice.SetSelection(0)
sizer.Add(self.network_choice, 0, wx.EXPAND | wx.ALL, 5)
sizer.Add(wx.StaticText(self, label="Ports (comma separated)"), 0, wx.TOP | wx.LEFT, 8)
self.port_ctrl = wx.TextCtrl(self, value="6667,6697")
sizer.Add(self.port_ctrl, 0, wx.EXPAND | wx.ALL, 5)
self.SetSizer(sizer)
def get_scan_params(self):
selection = self.network_choice.GetSelection()
if selection == wx.NOT_FOUND:
wx.MessageBox("Select a network to scan.", "Missing selection", wx.OK | wx.ICON_WARNING)
return None
cidr = self.networks[selection]["cidr"]
raw_ports = self.port_ctrl.GetValue().split(",")
ports = []
for raw in raw_ports:
raw = raw.strip()
if not raw:
continue
try:
port = int(raw)
if 1 <= port <= 65535:
ports.append(port)
except ValueError:
continue
if not ports:
wx.MessageBox("Enter at least one valid TCP port.", "Invalid ports", wx.OK | wx.ICON_WARNING)
return None
try:
network = ipaddress.ip_network(cidr, strict=False)
except ValueError:
wx.MessageBox("Invalid network selection.", "Network error", wx.OK | wx.ICON_ERROR)
return None
host_count = max(network.num_addresses - (2 if network.version == 4 and network.prefixlen <= 30 else 0), 1)
return {"cidr": str(network), "ports": ports, "host_count": host_count}

View File

@@ -0,0 +1,109 @@
import wx
import wx.adv as adv
import logging
logger = logging.getLogger(__name__)
class ScanWizardResultsPage(adv.WizardPageSimple):
def __init__(self, parent, scan_handler, main_frame):
super().__init__(parent)
self.scan_handler = scan_handler
self.main_frame = main_frame
self.discovered = []
self._build_ui()
def _build_ui(self):
sizer = wx.BoxSizer(wx.VERTICAL)
self.summary = wx.StaticText(self, label="Waiting to start…")
sizer.Add(self.summary, 0, wx.ALL, 5)
self.gauge = wx.Gauge(self, range=100, style=wx.GA_SMOOTH)
sizer.Add(self.gauge, 0, wx.EXPAND | wx.ALL, 5)
self.results_list = wx.ListCtrl(self, style=wx.LC_REPORT | wx.BORDER_SUNKEN)
self.results_list.InsertColumn(0, "Address", width=140)
self.results_list.InsertColumn(1, "Port", width=60)
self.results_list.InsertColumn(2, "Details", width=260)
self.results_list.Bind(wx.EVT_LIST_ITEM_SELECTED, self._toggle_buttons)
self.results_list.Bind(wx.EVT_LIST_ITEM_DESELECTED, self._toggle_buttons)
sizer.Add(self.results_list, 1, wx.EXPAND | wx.ALL, 5)
btn_row = wx.BoxSizer(wx.HORIZONTAL)
self.quick_connect_btn = wx.Button(self, label="Quick Connect")
self.quick_connect_btn.Disable()
self.quick_connect_btn.Bind(wx.EVT_BUTTON, self.on_quick_connect)
btn_row.Add(self.quick_connect_btn, 0, wx.RIGHT, 5)
self.rescan_btn = wx.Button(self, label="Rescan")
self.rescan_btn.Bind(wx.EVT_BUTTON, self.on_rescan)
btn_row.Add(self.rescan_btn, 0)
sizer.Add(btn_row, 0, wx.ALL | wx.ALIGN_RIGHT, 5)
self.SetSizer(sizer)
def prepare_for_scan(self, params, start_callback):
self.results_list.DeleteAllItems()
self.discovered = []
self.summary.SetLabel(f"Scanning {params['cidr']} on ports {', '.join(map(str, params['ports']))}")
self.gauge.SetRange(max(params["host_count"], 1))
self.gauge.SetValue(0)
self.quick_connect_btn.Disable()
start_callback(params)
def on_scan_progress(self, scanned, total):
try:
total = max(total, 1)
self.gauge.SetRange(total)
self.gauge.SetValue(min(scanned, total))
self.summary.SetLabel(f"Scanning… {scanned}/{total} hosts checked")
except RuntimeError:
# C++ SHIT
logger.debug("Scan progress update after controls destroyed; ignoring")
def on_scan_result(self, server_info):
"""Handle a single discovered server row."""
try:
idx = self.results_list.InsertItem(self.results_list.GetItemCount(), server_info["address"])
self.results_list.SetItem(idx, 1, str(server_info["port"]))
self.results_list.SetItem(idx, 2, server_info.get("banner", "IRC server detected"))
self.discovered.append(server_info)
self.summary.SetLabel(
f"Found {len(self.discovered)} {'server' if len(self.discovered) == 1 else 'servers'}"
)
except RuntimeError:
logger.debug("Scan result update after controls destroyed; ignoring")
def on_scan_complete(self, results):
"""Final scan completion callback."""
try:
if results:
self.summary.SetLabel(
f"Scan complete : {len(results)} "
f"{'server' if len(results) == 1 else 'servers'} ready."
)
else:
self.summary.SetLabel("Scan complete : no IRC servers discovered.")
self._toggle_buttons()
except RuntimeError:
logger.debug("Scan completion update after controls destroyed; ignoring")
def on_quick_connect(self, event):
row = self.results_list.GetFirstSelected()
if row == -1:
return
server = self.results_list.GetItemText(row, 0)
port = int(self.results_list.GetItemText(row, 1))
if self.main_frame.quick_connect(server, port):
self.GetParent().EndModal(wx.ID_OK)
def on_rescan(self, event):
wizard = self.GetParent()
wizard.ShowPage(wizard.intro_page)
def _toggle_buttons(self, event=None):
has_selection = self.results_list.GetFirstSelected() != -1
if has_selection:
self.quick_connect_btn.Enable()
else:
self.quick_connect_btn.Disable()

View File

@@ -17,10 +17,12 @@ import sys
from PrivacyNoticeDialog import PrivacyNoticeDialog from PrivacyNoticeDialog import PrivacyNoticeDialog
from IRCPanel import IRCPanel from IRCPanel import IRCPanel
from AboutDialog import AboutDialog from AboutDialog import AboutDialog
from InterfaceSelectDialog import InterfaceSelectDialog
from ManageChannelsDialog import ManageChannelsDialog
from NotesDialog import NotesDialog from NotesDialog import NotesDialog
if os.name == "nt": from Win32API import Win32API ; from Win32SoundHandler import Win32SoundHandler if os.name == "nt": from Win32API import Win32API ; from Win32SoundHandler import Win32SoundHandler
from ScanWizard import ScanWizardDialog from ScanWizard import ScanWizardDialog
from LocalServer import LocalServerManager from LocalServerManager import LocalServerManager
# Set up logging # Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
@@ -43,207 +45,6 @@ class UIUpdate:
self.args = args self.args = args
self.kwargs = kwargs self.kwargs = kwargs
class ManageChannelsDialog(wx.Dialog):
"""Simple dialog for curating the local server channel allowlist."""
def __init__(self, parent, channels):
super().__init__(parent, title="Manage Local Channels", size=(360, 420))
try:
self.SetIcon(parent.GetIcon())
except Exception:
pass
panel = wx.Panel(self)
panel.SetBackgroundColour(parent.theme["window_bg"])
main_sizer = wx.BoxSizer(wx.VERTICAL)
info = wx.StaticText(
panel,
label="Channels are shared with anyone on your LAN who joins the built-in server.",
)
info.Wrap(320)
main_sizer.Add(info, 0, wx.ALL | wx.EXPAND, 8)
self.list_box = wx.ListBox(panel)
for channel in channels:
self.list_box.Append(channel)
main_sizer.Add(self.list_box, 1, wx.ALL | wx.EXPAND, 8)
input_sizer = wx.BoxSizer(wx.HORIZONTAL)
self.channel_input = wx.TextCtrl(panel, style=wx.TE_PROCESS_ENTER)
self.channel_input.SetHint("#channel-name")
add_btn = wx.Button(panel, label="Add")
input_sizer.Add(self.channel_input, 1, wx.RIGHT, 4)
input_sizer.Add(add_btn, 0)
main_sizer.Add(input_sizer, 0, wx.EXPAND | wx.LEFT | wx.RIGHT, 8)
remove_btn = wx.Button(panel, label="Remove Selected")
reset_btn = wx.Button(panel, label="Reset to #lobby")
btn_sizer = wx.BoxSizer(wx.HORIZONTAL)
btn_sizer.Add(remove_btn, 1, wx.RIGHT, 4)
btn_sizer.Add(reset_btn, 1)
main_sizer.Add(btn_sizer, 0, wx.EXPAND | wx.LEFT | wx.RIGHT | wx.TOP, 8)
button_bar = wx.StdDialogButtonSizer()
ok_btn = wx.Button(panel, wx.ID_OK)
cancel_btn = wx.Button(panel, wx.ID_CANCEL)
button_bar.AddButton(ok_btn)
button_bar.AddButton(cancel_btn)
button_bar.Realize()
main_sizer.Add(button_bar, 0, wx.ALL | wx.EXPAND, 8)
panel.SetSizer(main_sizer)
# Bindings
add_btn.Bind(wx.EVT_BUTTON, self.on_add)
remove_btn.Bind(wx.EVT_BUTTON, self.on_remove)
reset_btn.Bind(wx.EVT_BUTTON, self.on_reset)
self.channel_input.Bind(wx.EVT_TEXT_ENTER, self.on_add)
ok_btn.Bind(wx.EVT_BUTTON, self.on_ok)
def get_channels(self):
return [self.list_box.GetString(i) for i in range(self.list_box.GetCount())]
def on_add(self, event):
value = self.channel_input.GetValue().strip()
if not value:
return
if not value.startswith('#'):
value = f"#{value}"
if not self._is_valid_channel(value):
wx.MessageBox(
"Channel names may contain letters, numbers, -, and _ only.",
"Invalid Channel",
wx.OK | wx.ICON_WARNING,
)
return
if self.list_box.FindString(value) != wx.NOT_FOUND:
wx.MessageBox("Channel already exists.", "Duplicate Channel", wx.OK | wx.ICON_INFORMATION)
return
self.list_box.Append(value)
self.channel_input.Clear()
def on_remove(self, event):
selection = self.list_box.GetSelection()
if selection != wx.NOT_FOUND:
self.list_box.Delete(selection)
def on_reset(self, event):
self.list_box.Clear()
self.list_box.Append("#lobby")
def on_ok(self, event):
if self.list_box.GetCount() == 0:
wx.MessageBox("Add at least one channel before saving.", "No Channels", wx.OK | wx.ICON_INFORMATION)
return
event.Skip()
@staticmethod
def _is_valid_channel(name):
if len(name) < 2:
return False
allowed = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_#"
return all(ch in allowed for ch in name)
class InterfaceSelectDialog(wx.Dialog):
"""Dialog that lets the user pick which local interface the server should bind to."""
def __init__(self, parent, current_host="127.0.0.1"):
super().__init__(parent, title="Select Network Interface", size=(420, 380))
try:
self.SetIcon(parent.GetIcon())
except Exception:
pass
self.selected_host = current_host
panel = wx.Panel(self)
panel.SetBackgroundColour(parent.theme["window_bg"])
main_sizer = wx.BoxSizer(wx.VERTICAL)
info = wx.StaticText(
panel,
label="Choose the Network interface where your server should run:\n"
"You are EXPOSING a Server to your LOCAL Network, this may give away who you are!\n",
)
info.Wrap(380)
main_sizer.Add(info, 0, wx.ALL | wx.EXPAND, 10)
self.interface_list = wx.ListCtrl(panel, style=wx.LC_REPORT | wx.BORDER_SUNKEN)
self.interface_list.InsertColumn(0, "Interface", width=180)
self.interface_list.InsertColumn(1, "Address", width=180)
self.interfaces = self._gather_interfaces()
current_index = 0
for idx, entry in enumerate[tuple[str, str]](self.interfaces):
name, address = entry
self.interface_list.InsertItem(idx, name)
self.interface_list.SetItem(idx, 1, address)
if address == current_host:
current_index = idx
self.interface_list.Select(current_index)
self.interface_list.EnsureVisible(current_index)
if self.interfaces:
self.selected_host = self.interfaces[current_index][1]
self.interface_list.Bind(wx.EVT_LIST_ITEM_SELECTED, self.on_select)
self.interface_list.Bind(wx.EVT_LIST_ITEM_ACTIVATED, self.on_activate)
main_sizer.Add(self.interface_list, 1, wx.ALL | wx.EXPAND, 10)
button_bar = wx.StdDialogButtonSizer()
ok_btn = wx.Button(panel, wx.ID_OK)
cancel_btn = wx.Button(panel, wx.ID_CANCEL)
button_bar.AddButton(ok_btn)
button_bar.AddButton(cancel_btn)
button_bar.Realize()
main_sizer.Add(button_bar, 0, wx.ALL | wx.EXPAND, 10)
ok_btn.Bind(wx.EVT_BUTTON, self.on_ok)
panel.SetSizer(main_sizer)
def on_select(self, event):
index = event.GetIndex()
_, address = self.interfaces[index]
self.selected_host = address
def _gather_interfaces(self):
entries = [
("Loopback only", "127.0.0.1"),
("All interfaces", "0.0.0.0"),
]
try:
import psutil
seen = {addr for _, addr in entries}
for name, addrs in psutil.net_if_addrs().items():
for addr in addrs:
if addr.family == socket.AF_INET and addr.address not in seen:
label = f"{name}"
entries.append((label, addr.address))
seen.add(addr.address)
except Exception as e:
logger.warning(f"Unable to enumerate network interfaces: {e}")
return entries
def get_selected_host(self):
return self.selected_host
def on_activate(self, event):
self.on_select(event)
self.EndModal(wx.ID_OK)
def on_ok(self, event):
index = self.interface_list.GetFirstSelected()
if index == -1 and self.interfaces:
wx.MessageBox("Select an interface before starting the server.", "No Interface Selected", wx.OK | wx.ICON_INFORMATION)
return
if index != -1:
_, address = self.interfaces[index]
self.selected_host = address
event.Skip()
class IRCFrame(wx.Frame): class IRCFrame(wx.Frame):
def __init__(self): def __init__(self):
super().__init__(None, title="wxIRC", size=(1200, 700)) super().__init__(None, title="wxIRC", size=(1200, 700))