5 Commits

Author SHA1 Message Date
00c111da73 some cleanup, seperating classes into their own files. 2026-03-11 12:14:30 +01:00
7301186102 icon for server 2025-12-15 12:59:28 +01:00
seppmutterman@gmail.com
f1ed8d36c4 fixed the build script 2025-12-14 18:47:17 +01:00
seppmutterman@gmail.com
69b10b0864 testing stuff ; testing branch lol 2025-12-14 18:35:51 +01:00
faeac6c96f some sound additions 2025-12-12 21:15:07 +01:00
21 changed files with 1569 additions and 1037 deletions

View File

@@ -10,8 +10,11 @@ pyinstaller `
--hidden-import wx._xml `
--add-data "FiraCode-Regular.ttf;." `
--add-data "FiraCode-SemiBold.ttf;." `
--add-data "src\sounds\*;sounds" `
--add-data "venv\Lib\site-packages\irc\codes.txt;irc" `
--add-data "icon.ico;." `
--add-data "src\channel.ico;." `
--add-data "src\server.ico;." `
--icon "icon.ico" `
"src/main.py"

View File

@@ -31,31 +31,20 @@ class AboutDialog(wx.Dialog):
info_text = wx.StaticText(self, label="wxIRC Client")
info_font = wx.Font(14, wx.FONTFAMILY_DEFAULT, wx.FONTSTYLE_NORMAL, wx.FONTWEIGHT_BOLD)
info_text.SetFont(info_font)
sizer.Add(info_text, 0, wx.ALL | wx.ALIGN_CENTER, 5)
import base64, hashlib
self.encoded = base64.b64encode(
hashlib.sha256(f"{self.GetId()}-{self.GetHandle()}".encode()).digest()
).decode('utf-8')
version_text = wx.StaticText(self, label=f"wxIRC V:e1")
version_text = wx.StaticText(self, label="Version : 0.2.3") # COMMIT_HUNDRED:COMMIT_TEN:COMMIT_ONE e.g 23 commits -> 0.2.3
version_font = wx.Font(10, wx.FONTFAMILY_DEFAULT, wx.FONTSTYLE_NORMAL, wx.FONTWEIGHT_NORMAL)
version_text.SetFont(version_font)
rand_hash = wx.StaticText(self, label=f"{self.encoded}")
rand_hash.SetFont(version_font)
contrubutors_text = wx.StaticText(self, label="This software may not be used for commercial purposes. \n And may not be distributed for commercial purposes.")
contrubutors_font = wx.Font(10, wx.FONTFAMILY_DEFAULT, wx.FONTSTYLE_NORMAL, wx.FONTWEIGHT_NORMAL)
contrubutors_text.SetFont(contrubutors_font)
# Add info to sizer
sizer.Add(info_text, 0, wx.ALL | wx.ALIGN_CENTER, 5)
sizer.Add(version_text, 0, wx.ALL | wx.ALIGN_CENTER, 5)
sizer.Add(rand_hash, 0, wx.ALL | wx.ALIGN_CENTER, 5)
sizer.Add(contrubutors_text, 0, wx.ALL | wx.ALIGN_CENTER, 5)
contributors_text = wx.StaticText(self, label="""
Contributors:
* rattatwinko (rattatwinko.servecounterstrike.com)
""")
contributors_font = wx.Font(10, wx.FONTFAMILY_DEFAULT, wx.FONTSTYLE_NORMAL, wx.FONTWEIGHT_NORMAL)
contributors_text.SetFont(contributors_font)
sizer.Add(contributors_text, 0, wx.ALL | wx.ALIGN_CENTER, 5)
# OK button
ok_btn = wx.Button(self, wx.ID_OK, "OK")

161
src/CommandAutocomplete.py Normal file
View File

@@ -0,0 +1,161 @@
import wx
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class CommandAutocomplete(wx.PopupTransientWindow):
"""Popup window for IRC command autocomplete, similar to Minecraft."""
def __init__(self, parent, commands, on_select_callback):
super().__init__(parent, wx.BORDER_SIMPLE)
self.on_select_callback = on_select_callback
self.commands = commands # List of (command, description) tuples
self.filtered_commands = commands.copy()
self.selected_index = 0
self._init_ui()
self.Bind(wx.EVT_WINDOW_DESTROY, self.on_destroy)
def _init_ui(self):
panel = wx.Panel(self)
main_sizer = wx.BoxSizer(wx.VERTICAL)
# Command list
self.list_ctrl = wx.ListCtrl(panel, style=wx.LC_REPORT | wx.LC_SINGLE_SEL | wx.LC_NO_HEADER)
self.list_ctrl.InsertColumn(0, "Command", width=120)
self.list_ctrl.InsertColumn(1, "Description", width=280)
self._update_list()
# Bind events
self.list_ctrl.Bind(wx.EVT_LIST_ITEM_ACTIVATED, self.on_item_activated)
self.list_ctrl.Bind(wx.EVT_LIST_ITEM_SELECTED, self.on_item_selected)
self.Bind(wx.EVT_CHAR_HOOK, self.on_char_hook)
self.Bind(wx.EVT_KILL_FOCUS, self.on_kill_focus)
main_sizer.Add(self.list_ctrl, 1, wx.EXPAND | wx.ALL, 2)
panel.SetSizer(main_sizer)
main_sizer.Fit(panel)
self.SetClientSize(panel.GetBestSize())
# Select first item
if self.list_ctrl.GetItemCount() > 0:
self.list_ctrl.Select(0)
self.selected_index = 0
def _update_list(self):
"""Update the list with filtered commands."""
self.list_ctrl.DeleteAllItems()
for cmd, desc in self.filtered_commands:
idx = self.list_ctrl.InsertItem(self.list_ctrl.GetItemCount(), cmd)
self.list_ctrl.SetItem(idx, 1, desc)
# Resize to fit content (max 8 items visible)
item_height = 20
max_items = min(8, len(self.filtered_commands))
self.list_ctrl.SetSize((410, item_height * max_items + 4))
self.SetClientSize((410, item_height * max_items + 4))
def filter_commands(self, search_text):
"""Filter commands based on search text."""
search_lower = search_text.lower().strip()
if not search_lower:
self.filtered_commands = self.commands.copy()
else:
self.filtered_commands = [
(cmd, desc) for cmd, desc in self.commands
if cmd.lower().startswith(search_lower)
]
self.selected_index = 0
self._update_list()
if self.list_ctrl.GetItemCount() > 0:
self.list_ctrl.Select(0)
def on_item_activated(self, event):
"""Handle double-click or Enter on item."""
idx = event.GetIndex()
if 0 <= idx < len(self.filtered_commands):
cmd, _ = self.filtered_commands[idx]
if self.on_select_callback:
self.on_select_callback(cmd)
self.safe_dismiss()
def on_item_selected(self, event):
"""Handle item selection."""
self.selected_index = event.GetIndex()
def select_next(self):
"""Select next item."""
if self.list_ctrl.GetItemCount() > 0:
self.selected_index = (self.selected_index + 1) % self.list_ctrl.GetItemCount()
self.list_ctrl.Select(self.selected_index)
self.list_ctrl.EnsureVisible(self.selected_index)
def select_previous(self):
"""Select previous item."""
if self.list_ctrl.GetItemCount() > 0:
self.selected_index = (self.selected_index - 1) % self.list_ctrl.GetItemCount()
self.list_ctrl.Select(self.selected_index)
self.list_ctrl.EnsureVisible(self.selected_index)
def get_selected_command(self):
"""Get the currently selected command."""
if 0 <= self.selected_index < len(self.filtered_commands):
return self.filtered_commands[self.selected_index][0]
return None
def on_char_hook(self, event):
"""Handle keyboard events."""
keycode = event.GetKeyCode()
if keycode == wx.WXK_ESCAPE:
self.safe_dismiss()
elif keycode == wx.WXK_UP:
self.select_previous()
elif keycode == wx.WXK_DOWN:
self.select_next()
elif keycode == wx.WXK_RETURN or keycode == wx.WXK_NUMPAD_ENTER:
cmd = self.get_selected_command()
if cmd and self.on_select_callback:
self.on_select_callback(cmd)
self.safe_dismiss()
else:
event.Skip()
def on_kill_focus(self, event):
"""Handle focus loss."""
focused = wx.Window.FindFocus()
if focused and (focused == self.list_ctrl or self.IsDescendant(focused)):
event.Skip()
return
wx.CallLater(100, self.safe_dismiss)
event.Skip()
def on_destroy(self, event):
event.Skip()
def safe_dismiss(self):
"""Safely dismiss the popup."""
if not self.IsBeingDeleted() and self.IsShown():
try:
self.Dismiss()
except Exception as e:
logger.error(f"Error dismissing command autocomplete: {e}")
def show_at_input(self, input_ctrl):
"""Show popup near the input control."""
input_screen_pos = input_ctrl.ClientToScreen((0, 0))
popup_size = self.GetSize()
display_size = wx.GetDisplaySize()
# Show above the input, otherwise below
if input_screen_pos.y - popup_size.height > 0:
popup_y = input_screen_pos.y - popup_size.height - 2
else:
popup_y = input_screen_pos.y + input_ctrl.GetSize().height + 2
# Keep popup on screen horizontally
popup_x = max(10, min(input_screen_pos.x, display_size.x - popup_size.width - 10))
self.Position((popup_x, popup_y), (0, 0))
self.Popup()

View File

@@ -5,358 +5,13 @@ import logging
from SearchDialog import SearchDialog
import traceback
import KaomojiPicker as KaomojiPicker
import CommandAutocomplete as CommandAutocomplete
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class KaomojiPicker(wx.PopupTransientWindow):
def __init__(self, parent, kaomoji_groups, on_select_callback):
super().__init__(parent, wx.BORDER_SIMPLE)
self.on_select_callback = on_select_callback
self.kaomoji_groups = kaomoji_groups
self._init_ui()
self.Bind(wx.EVT_WINDOW_DESTROY, self.on_destroy)
def _init_ui(self):
panel = wx.Panel(self)
main_sizer = wx.BoxSizer(wx.VERTICAL)
# Scrolled content area
self.scroll = wx.ScrolledWindow(panel, size=(380, 420), style=wx.VSCROLL)
self.scroll.SetScrollRate(0, 15)
self.scroll_sizer = wx.BoxSizer(wx.VERTICAL)
# Storage for filtering
self.all_buttons = []
self.group_headers = {}
self.group_containers = {}
# Build kaomoji groups
self._build_kaomoji_groups()
self.scroll.SetSizer(self.scroll_sizer)
main_sizer.Add(self.scroll, 1, wx.EXPAND)
panel.SetSizer(main_sizer)
main_sizer.Fit(panel)
self.SetClientSize(panel.GetBestSize())
# Bind events
self._bind_events(panel)
def _build_kaomoji_groups(self):
dc = wx.ClientDC(self.scroll)
dc.SetFont(self.scroll.GetFont())
self.scroll_sizer.AddSpacer(8)
for group_name, kaomojis in self.kaomoji_groups.items():
if not kaomojis:
continue
# Group header
header = self._create_group_header(group_name)
self.group_headers[group_name] = header
self.scroll_sizer.Add(header, 0, wx.LEFT | wx.TOP | wx.BOTTOM, 12)
# Wrap sizer for buttons
wrap_sizer = wx.WrapSizer(wx.HORIZONTAL)
for kaomoji in kaomojis:
btn = self._create_kaomoji_button(kaomoji, dc, group_name, wrap_sizer)
self.all_buttons.append(btn)
wrap_sizer.Add(btn, 0, wx.ALL, 3)
self.group_containers[group_name] = wrap_sizer
self.scroll_sizer.Add(wrap_sizer, 0, wx.LEFT | wx.RIGHT | wx.BOTTOM, 12)
def _create_group_header(self, group_name):
header = wx.StaticText(self.scroll, label=group_name)
font = header.GetFont()
font.SetWeight(wx.FONTWEIGHT_BOLD)
font.SetPointSize(9)
header.SetFont(font)
return header
def _create_kaomoji_button(self, kaomoji, dc, group_name, wrap_sizer):
text_width, text_height = dc.GetTextExtent(kaomoji)
btn_width = min(max(text_width + 20, 55), 140)
btn_height = text_height + 14
btn = wx.Button(self.scroll, label=kaomoji, size=(btn_width, btn_height))
btn.SetToolTip(f"{kaomoji} - {group_name}")
btn._kaomoji_text = kaomoji.lower()
btn._kaomoji_group = group_name.lower()
btn._group_name = group_name
# Bind click event
btn.Bind(wx.EVT_BUTTON, lambda evt, k=kaomoji: self.on_kaomoji_selected(k))
return btn
def _bind_events(self, panel):
panel.Bind(wx.EVT_CHAR_HOOK, self.on_char_hook)
self.Bind(wx.EVT_KILL_FOCUS, self.on_kill_focus)
def on_kaomoji_selected(self, kaomoji):
try:
if self.on_select_callback:
self.on_select_callback(kaomoji)
except Exception as e:
logger.error(f"Error in kaomoji selection callback: {e}")
finally:
# Safely dismiss the popup
wx.CallAfter(self.safe_dismiss)
def on_search(self, event):
try:
search_text = self.search_ctrl.GetValue().lower().strip()
if not search_text:
self._show_all()
return
visible_groups = set()
for btn in self.all_buttons:
# Match kaomoji text or group name
is_match = (search_text in btn._kaomoji_text or
search_text in btn._kaomoji_group)
btn.Show(is_match)
if is_match:
visible_groups.add(btn._group_name)
# Show/hide group headers
for group_name, header in self.group_headers.items():
header.Show(group_name in visible_groups)
self.scroll.Layout()
self.scroll.FitInside()
except Exception as e:
logger.error(f"Error in kaomoji search: {e}")
def on_clear_search(self, event):
self.search_ctrl.SetValue("")
self._show_all()
def _show_all(self):
for btn in self.all_buttons:
btn.Show()
for header in self.group_headers.values():
header.Show()
self.scroll.Layout()
self.scroll.FitInside()
def on_char_hook(self, event):
if event.GetKeyCode() == wx.WXK_ESCAPE:
self.safe_dismiss()
else:
event.Skip()
def on_kill_focus(self, event):
focused = wx.Window.FindFocus()
if focused and (focused == self.search_ctrl or
focused.GetParent() == self.scroll or
self.scroll.IsDescendant(focused)):
event.Skip()
return
wx.CallLater(100, self.safe_dismiss)
event.Skip()
def on_destroy(self, event):
self.all_buttons.clear()
self.group_headers.clear()
self.group_containers.clear()
event.Skip()
def safe_dismiss(self):
if not self.IsBeingDeleted() and self.IsShown():
try:
self.Dismiss()
except Exception as e:
logger.error(f"Error dismissing popup: {e}")
def show_at_button(self, button):
btn_screen_pos = button.ClientToScreen((0, 0))
popup_size = self.GetSize()
display_size = wx.GetDisplaySize()
# Try to show above the button, otherwise below
if btn_screen_pos.y - popup_size.height > 0:
popup_y = btn_screen_pos.y - popup_size.height
else:
popup_y = btn_screen_pos.y + button.GetSize().height
# Keep popup on screen horizontally
popup_x = max(10, min(btn_screen_pos.x, display_size.x - popup_size.width - 10))
self.Position((popup_x, popup_y), (0, 0))
self.Popup()
class CommandAutocomplete(wx.PopupTransientWindow):
"""Popup window for IRC command autocomplete, similar to Minecraft."""
def __init__(self, parent, commands, on_select_callback):
super().__init__(parent, wx.BORDER_SIMPLE)
self.on_select_callback = on_select_callback
self.commands = commands # List of (command, description) tuples
self.filtered_commands = commands.copy()
self.selected_index = 0
self._init_ui()
self.Bind(wx.EVT_WINDOW_DESTROY, self.on_destroy)
def _init_ui(self):
panel = wx.Panel(self)
main_sizer = wx.BoxSizer(wx.VERTICAL)
# Command list
self.list_ctrl = wx.ListCtrl(panel, style=wx.LC_REPORT | wx.LC_SINGLE_SEL | wx.LC_NO_HEADER)
self.list_ctrl.InsertColumn(0, "Command", width=120)
self.list_ctrl.InsertColumn(1, "Description", width=280)
self._update_list()
# Bind events
self.list_ctrl.Bind(wx.EVT_LIST_ITEM_ACTIVATED, self.on_item_activated)
self.list_ctrl.Bind(wx.EVT_LIST_ITEM_SELECTED, self.on_item_selected)
self.Bind(wx.EVT_CHAR_HOOK, self.on_char_hook)
self.Bind(wx.EVT_KILL_FOCUS, self.on_kill_focus)
main_sizer.Add(self.list_ctrl, 1, wx.EXPAND | wx.ALL, 2)
panel.SetSizer(main_sizer)
main_sizer.Fit(panel)
self.SetClientSize(panel.GetBestSize())
# Select first item
if self.list_ctrl.GetItemCount() > 0:
self.list_ctrl.Select(0)
self.selected_index = 0
def _update_list(self):
"""Update the list with filtered commands."""
self.list_ctrl.DeleteAllItems()
for cmd, desc in self.filtered_commands:
idx = self.list_ctrl.InsertItem(self.list_ctrl.GetItemCount(), cmd)
self.list_ctrl.SetItem(idx, 1, desc)
# Resize to fit content (max 8 items visible)
item_height = 20
max_items = min(8, len(self.filtered_commands))
self.list_ctrl.SetSize((410, item_height * max_items + 4))
self.SetClientSize((410, item_height * max_items + 4))
def filter_commands(self, search_text):
"""Filter commands based on search text."""
search_lower = search_text.lower().strip()
if not search_lower:
self.filtered_commands = self.commands.copy()
else:
self.filtered_commands = [
(cmd, desc) for cmd, desc in self.commands
if cmd.lower().startswith(search_lower)
]
self.selected_index = 0
self._update_list()
if self.list_ctrl.GetItemCount() > 0:
self.list_ctrl.Select(0)
def on_item_activated(self, event):
"""Handle double-click or Enter on item."""
idx = event.GetIndex()
if 0 <= idx < len(self.filtered_commands):
cmd, _ = self.filtered_commands[idx]
if self.on_select_callback:
self.on_select_callback(cmd)
self.safe_dismiss()
def on_item_selected(self, event):
"""Handle item selection."""
self.selected_index = event.GetIndex()
def select_next(self):
"""Select next item."""
if self.list_ctrl.GetItemCount() > 0:
self.selected_index = (self.selected_index + 1) % self.list_ctrl.GetItemCount()
self.list_ctrl.Select(self.selected_index)
self.list_ctrl.EnsureVisible(self.selected_index)
def select_previous(self):
"""Select previous item."""
if self.list_ctrl.GetItemCount() > 0:
self.selected_index = (self.selected_index - 1) % self.list_ctrl.GetItemCount()
self.list_ctrl.Select(self.selected_index)
self.list_ctrl.EnsureVisible(self.selected_index)
def get_selected_command(self):
"""Get the currently selected command."""
if 0 <= self.selected_index < len(self.filtered_commands):
return self.filtered_commands[self.selected_index][0]
return None
def on_char_hook(self, event):
"""Handle keyboard events."""
keycode = event.GetKeyCode()
if keycode == wx.WXK_ESCAPE:
self.safe_dismiss()
elif keycode == wx.WXK_UP:
self.select_previous()
elif keycode == wx.WXK_DOWN:
self.select_next()
elif keycode == wx.WXK_RETURN or keycode == wx.WXK_NUMPAD_ENTER:
cmd = self.get_selected_command()
if cmd and self.on_select_callback:
self.on_select_callback(cmd)
self.safe_dismiss()
else:
event.Skip()
def on_kill_focus(self, event):
"""Handle focus loss."""
focused = wx.Window.FindFocus()
if focused and (focused == self.list_ctrl or self.IsDescendant(focused)):
event.Skip()
return
wx.CallLater(100, self.safe_dismiss)
event.Skip()
def on_destroy(self, event):
event.Skip()
def safe_dismiss(self):
"""Safely dismiss the popup."""
if not self.IsBeingDeleted() and self.IsShown():
try:
self.Dismiss()
except Exception as e:
logger.error(f"Error dismissing command autocomplete: {e}")
def show_at_input(self, input_ctrl):
"""Show popup near the input control."""
input_screen_pos = input_ctrl.ClientToScreen((0, 0))
popup_size = self.GetSize()
display_size = wx.GetDisplaySize()
# Show above the input, otherwise below
if input_screen_pos.y - popup_size.height > 0:
popup_y = input_screen_pos.y - popup_size.height - 2
else:
popup_y = input_screen_pos.y + input_ctrl.GetSize().height + 2
# Keep popup on screen horizontally
popup_x = max(10, min(input_screen_pos.x, display_size.x - popup_size.width - 10))
self.Position((popup_x, popup_y), (0, 0))
self.Popup()
class IRCPanel(wx.Panel):
# IRC commands with descriptions
IRC_COMMANDS = [
@@ -561,9 +216,13 @@ class IRCPanel(wx.Panel):
logger.error(f"Error in add_message: {e}")
def _add_message_safe(self, message, username_color=None, message_color=None,
bold=False, italic=False, underline=False):
bold=False, italic=False, underline=False):
"""Add message to display with formatting (must be called from main thread)."""
try:
# Safety check: ensure text_ctrl still exists
if not self.text_ctrl or not self:
return
self.messages.append(message)
# Check if user is at bottom
@@ -669,7 +328,7 @@ class IRCPanel(wx.Panel):
self.current_popup.Dismiss()
# Create new popup
self.current_popup = KaomojiPicker(
self.current_popup = KaomojiPicker.KaomojiPicker(
self,
self.KAOMOJI_GROUPS,
self.on_kaomoji_insert
@@ -742,7 +401,7 @@ class IRCPanel(wx.Panel):
try:
if not self.command_popup or self.command_popup.IsBeingDeleted():
# Create new popup
self.command_popup = CommandAutocomplete(
self.command_popup = CommandAutocomplete.CommandAutocomplete( # fuckass python
self,
self.IRC_COMMANDS,
self.on_command_select

View File

@@ -0,0 +1,103 @@
import wx
import socket
import logging
logger = logging.getLogger(__name__)
class InterfaceSelectDialog(wx.Dialog):
"""Dialog that lets the user pick which local interface the server should bind to."""
def __init__(self, parent, current_host="127.0.0.1"):
super().__init__(parent, title="Select Network Interface", size=(420, 380))
try:
self.SetIcon(parent.GetIcon())
except Exception:
pass
self.selected_host = current_host
panel = wx.Panel(self)
panel.SetBackgroundColour(parent.theme["window_bg"])
main_sizer = wx.BoxSizer(wx.VERTICAL)
info = wx.StaticText(
panel,
label="Choose the Network interface where your server should run:\n"
"You are EXPOSING a Server to your LOCAL Network, this may give away who you are!\n",
)
info.Wrap(380)
main_sizer.Add(info, 0, wx.ALL | wx.EXPAND, 10)
self.interface_list = wx.ListCtrl(panel, style=wx.LC_REPORT | wx.BORDER_SUNKEN)
self.interface_list.InsertColumn(0, "Interface", width=180)
self.interface_list.InsertColumn(1, "Address", width=180)
self.interfaces = self._gather_interfaces()
current_index = 0
for idx, entry in enumerate[tuple[str, str]](self.interfaces):
name, address = entry
self.interface_list.InsertItem(idx, name)
self.interface_list.SetItem(idx, 1, address)
if address == current_host:
current_index = idx
self.interface_list.Select(current_index)
self.interface_list.EnsureVisible(current_index)
if self.interfaces:
self.selected_host = self.interfaces[current_index][1]
self.interface_list.Bind(wx.EVT_LIST_ITEM_SELECTED, self.on_select)
self.interface_list.Bind(wx.EVT_LIST_ITEM_ACTIVATED, self.on_activate)
main_sizer.Add(self.interface_list, 1, wx.ALL | wx.EXPAND, 10)
button_bar = wx.StdDialogButtonSizer()
ok_btn = wx.Button(panel, wx.ID_OK)
cancel_btn = wx.Button(panel, wx.ID_CANCEL)
button_bar.AddButton(ok_btn)
button_bar.AddButton(cancel_btn)
button_bar.Realize()
main_sizer.Add(button_bar, 0, wx.ALL | wx.EXPAND, 10)
ok_btn.Bind(wx.EVT_BUTTON, self.on_ok)
panel.SetSizer(main_sizer)
def on_select(self, event):
index = event.GetIndex()
_, address = self.interfaces[index]
self.selected_host = address
def _gather_interfaces(self):
entries = [
("Loopback only", "127.0.0.1"),
("All interfaces", "0.0.0.0"),
]
try:
import psutil
seen = {addr for _, addr in entries}
for name, addrs in psutil.net_if_addrs().items():
for addr in addrs:
if addr.family == socket.AF_INET and addr.address not in seen:
label = f"{name}"
entries.append((label, addr.address))
seen.add(addr.address)
except Exception as e:
logger.warning(f"Unable to enumerate network interfaces: {e}")
return entries
def get_selected_host(self):
return self.selected_host
def on_activate(self, event):
self.on_select(event)
self.EndModal(wx.ID_OK)
def on_ok(self, event):
index = self.interface_list.GetFirstSelected()
if index == -1 and self.interfaces:
wx.MessageBox("Select an interface before starting the server.", "No Interface Selected", wx.OK | wx.ICON_INFORMATION)
return
if index != -1:
_, address = self.interfaces[index]
self.selected_host = address
event.Skip()

197
src/KaomojiPicker.py Normal file
View File

@@ -0,0 +1,197 @@
import logging
import traceback
import wx
import wx.adv
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class KaomojiPicker(wx.PopupTransientWindow):
def __init__(self, parent, kaomoji_groups, on_select_callback):
super().__init__(parent, wx.BORDER_SIMPLE)
self.on_select_callback = on_select_callback
self.kaomoji_groups = kaomoji_groups
self._init_ui()
self.Bind(wx.EVT_WINDOW_DESTROY, self.on_destroy)
def _init_ui(self):
panel = wx.Panel(self)
main_sizer = wx.BoxSizer(wx.VERTICAL)
# Scrolled content area
self.scroll = wx.ScrolledWindow(panel, size=(380, 420), style=wx.VSCROLL)
self.scroll.SetScrollRate(0, 15)
self.scroll_sizer = wx.BoxSizer(wx.VERTICAL)
# Storage for filtering
self.all_buttons = []
self.group_headers = {}
self.group_containers = {}
# Build kaomoji groups
self._build_kaomoji_groups()
self.scroll.SetSizer(self.scroll_sizer)
main_sizer.Add(self.scroll, 1, wx.EXPAND)
panel.SetSizer(main_sizer)
main_sizer.Fit(panel)
self.SetClientSize(panel.GetBestSize())
# Bind events
self._bind_events(panel)
def _build_kaomoji_groups(self):
dc = wx.ClientDC(self.scroll)
dc.SetFont(self.scroll.GetFont())
self.scroll_sizer.AddSpacer(8)
for group_name, kaomojis in self.kaomoji_groups.items():
if not kaomojis:
continue
# Group header
header = self._create_group_header(group_name)
self.group_headers[group_name] = header
self.scroll_sizer.Add(header, 0, wx.LEFT | wx.TOP | wx.BOTTOM, 12)
# Wrap sizer for buttons
wrap_sizer = wx.WrapSizer(wx.HORIZONTAL)
for kaomoji in kaomojis:
btn = self._create_kaomoji_button(kaomoji, dc, group_name, wrap_sizer)
self.all_buttons.append(btn)
wrap_sizer.Add(btn, 0, wx.ALL, 3)
self.group_containers[group_name] = wrap_sizer
self.scroll_sizer.Add(wrap_sizer, 0, wx.LEFT | wx.RIGHT | wx.BOTTOM, 12)
def _create_group_header(self, group_name):
header = wx.StaticText(self.scroll, label=group_name)
font = header.GetFont()
font.SetWeight(wx.FONTWEIGHT_BOLD)
font.SetPointSize(9)
header.SetFont(font)
return header
def _create_kaomoji_button(self, kaomoji, dc, group_name, wrap_sizer):
text_width, text_height = dc.GetTextExtent(kaomoji)
btn_width = min(max(text_width + 20, 55), 140)
btn_height = text_height + 14
btn = wx.Button(self.scroll, label=kaomoji, size=(btn_width, btn_height))
btn.SetToolTip(f"{kaomoji} - {group_name}")
btn._kaomoji_text = kaomoji.lower()
btn._kaomoji_group = group_name.lower()
btn._group_name = group_name
# Bind click event
btn.Bind(wx.EVT_BUTTON, lambda evt, k=kaomoji: self.on_kaomoji_selected(k))
return btn
def _bind_events(self, panel):
panel.Bind(wx.EVT_CHAR_HOOK, self.on_char_hook)
self.Bind(wx.EVT_KILL_FOCUS, self.on_kill_focus)
def on_kaomoji_selected(self, kaomoji):
try:
if self.on_select_callback:
self.on_select_callback(kaomoji)
except Exception as e:
logger.error(f"Error in kaomoji selection callback: {e}")
finally:
# Safely dismiss the popup
wx.CallAfter(self.safe_dismiss)
def on_search(self, event):
try:
search_text = self.search_ctrl.GetValue().lower().strip()
if not search_text:
self._show_all()
return
visible_groups = set()
for btn in self.all_buttons:
# Match kaomoji text or group name
is_match = (search_text in btn._kaomoji_text or
search_text in btn._kaomoji_group)
btn.Show(is_match)
if is_match:
visible_groups.add(btn._group_name)
# Show/hide group headers
for group_name, header in self.group_headers.items():
header.Show(group_name in visible_groups)
self.scroll.Layout()
self.scroll.FitInside()
except Exception as e:
logger.error(f"Error in kaomoji search: {e}")
def on_clear_search(self, event):
self.search_ctrl.SetValue("")
self._show_all()
def _show_all(self):
for btn in self.all_buttons:
btn.Show()
for header in self.group_headers.values():
header.Show()
self.scroll.Layout()
self.scroll.FitInside()
def on_char_hook(self, event):
if event.GetKeyCode() == wx.WXK_ESCAPE:
self.safe_dismiss()
else:
event.Skip()
def on_kill_focus(self, event):
focused = wx.Window.FindFocus()
if focused and (focused == self.search_ctrl or
focused.GetParent() == self.scroll or
self.scroll.IsDescendant(focused)):
event.Skip()
return
wx.CallLater(100, self.safe_dismiss)
event.Skip()
def on_destroy(self, event):
self.all_buttons.clear()
self.group_headers.clear()
self.group_containers.clear()
event.Skip()
def safe_dismiss(self):
if not self.IsBeingDeleted() and self.IsShown():
try:
self.Dismiss()
except Exception as e:
logger.error(f"Error dismissing popup: {e}")
def show_at_button(self, button):
btn_screen_pos = button.ClientToScreen((0, 0))
popup_size = self.GetSize()
display_size = wx.GetDisplaySize()
# Try to show above the button, otherwise below
if btn_screen_pos.y - popup_size.height > 0:
popup_y = btn_screen_pos.y - popup_size.height
else:
popup_y = btn_screen_pos.y + button.GetSize().height
# Keep popup on screen horizontally
popup_x = max(10, min(btn_screen_pos.x, display_size.x - popup_size.width - 10))
self.Position((popup_x, popup_y), (0, 0))
self.Popup()

View File

@@ -1,18 +1,11 @@
import ipaddress
import logging
import socket
import threading
from typing import Callable, Iterable, List, Optional
from irc import server as irc_server
logger = logging.getLogger(__name__)
def _default_log_callback(message: str, color=None, bold: bool = False):
"""Fallback logger when UI callback is not available."""
logger.info(message)
class LocalOnlyIRCServer(irc_server.IRCServer):
"""IRC server that only accepts connections from local/LAN addresses."""
@@ -44,170 +37,66 @@ class LocalOnlyIRCServer(irc_server.IRCServer):
return False
class LocalServerManager:
"""Manages the background IRC server lifecycle."""
DEFAULT_CHANNELS = ["#lobby"]
DEFAULT_ALLOWED_NETWORKS = [
ipaddress.ip_network("127.0.0.0/8"), # Loopback
ipaddress.ip_network("10.0.0.0/8"), # RFC1918
ipaddress.ip_network("172.16.0.0/12"), # RFC1918
ipaddress.ip_network("192.168.0.0/16"), # RFC1918
ipaddress.ip_network("169.254.0.0/16"), # Link-local
]
if __name__ == "__main__":
import argparse
import sys
import signal
def __init__(
self,
log_callback: Callable[[str, Optional[object], bool], None] = _default_log_callback,
listen_host: str = "0.0.0.0",
listen_port: int = 6667,
):
self.log_callback = log_callback or _default_log_callback
self.listen_host = listen_host
self.listen_port = listen_port
self.allowed_networks = list(self.DEFAULT_ALLOWED_NETWORKS)
self._channels = list(self.DEFAULT_CHANNELS)
def main():
parser = argparse.ArgumentParser(
description="Run a local-only IRC server."
)
parser.add_argument(
"--host", type=str, default="0.0.0.0",
help="Bind host (default: 0.0.0.0)"
)
parser.add_argument(
"--port", type=int, default=6667,
help="Bind port (default: 6667)"
)
parser.add_argument(
"--channels", type=str, default="#lobby",
help="Comma-separated list of channels (default: #lobby)"
)
parser.add_argument(
"--verbose", action="store_true",
help="Enable verbose logging"
)
self._server: Optional[LocalOnlyIRCServer] = None
self._thread: Optional[threading.Thread] = None
self._lock = threading.RLock()
self._running = threading.Event()
self._ready = threading.Event()
self._error: Optional[Exception] = None
args = parser.parse_args()
# Public API ---------------------------------------------------------
def start(self, timeout: float = 5.0):
"""Start the background IRC server."""
with self._lock:
if self._running.is_set():
raise RuntimeError("Local IRC server is already running.")
# Set up logging
logging.basicConfig(
level=logging.DEBUG if args.verbose else logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s"
)
self._running.set()
self._ready.clear()
self._error = None
# Initialize the server manager
manager = LocalServerManager( # pyright: ignore[reportUndefinedVariable]
listen_host=args.host,
listen_port=args.port
)
manager.set_channels([ch.strip() for ch in args.channels.split(",") if ch.strip()])
self._thread = threading.Thread(
target=self._serve_forever,
name="Local-IRC-Server",
daemon=True,
)
self._thread.start()
# Handle Ctrl+C gracefully
def signal_handler(sig, frame):
print("\nStopping server...")
manager.stop()
sys.exit(0)
if not self._ready.wait(timeout):
self._running.clear()
raise TimeoutError("Local IRC server failed to start in time.")
signal.signal(signal.SIGINT, signal_handler)
if self._error:
raise self._error
def stop(self, timeout: float = 5.0):
"""Stop the IRC server if it is running."""
with self._lock:
if not self._running.is_set():
return
server = self._server
thread = self._thread
if server:
server.shutdown()
server.server_close()
if thread:
thread.join(timeout=timeout)
with self._lock:
self._server = None
self._thread = None
self._running.clear()
self._ready.clear()
def is_running(self) -> bool:
return self._running.is_set()
def set_listen_host(self, host: str):
"""Update the bind address. Local server must be stopped."""
with self._lock:
if self._running.is_set():
raise RuntimeError("Stop the server before changing the interface.")
self.listen_host = host
self._log(f"Local server interface set to {host}.")
def get_channels(self) -> List[str]:
with self._lock:
return list(self._channels)
def set_channels(self, channels: Iterable[str]):
cleaned = self._sanitize_channels(channels)
with self._lock:
self._channels = cleaned or list(self.DEFAULT_CHANNELS)
if self.is_running():
self._log(
"Channel list updated. Restart local server to apply changes.",
bold=True,
)
# Internal helpers ---------------------------------------------------
def _serve_forever(self):
try:
server = LocalOnlyIRCServer(
(self.listen_host, self.listen_port),
irc_server.IRCClient,
self.allowed_networks,
blocked_callback=lambda ip: self._log(
f"Blocked connection attempt from {ip}", bold=False
),
)
server.servername = socket.gethostname() or "wxirc-local"
with self._lock:
self._server = server
seed_channels = self._channels or list(self.DEFAULT_CHANNELS)
for channel in seed_channels:
server.channels.setdefault(channel, irc_server.IRCChannel(channel))
self._log(
f"Local IRC server listening on {self.listen_host}:{self.listen_port}",
bold=True,
)
self._ready.set()
server.serve_forever()
except Exception as exc:
logger.exception("Local IRC server failed: %s", exc)
self._error = exc
self._ready.set()
self._log(f"Local server error: {exc}", bold=True)
finally:
self._running.clear()
self._log("Local IRC server stopped.")
def _sanitize_channels(self, channels: Iterable[str]) -> List[str]:
unique = []
seen = set()
for channel in channels:
if not channel:
continue
name = channel.strip()
if not name.startswith("#"):
name = f"#{name}"
if not self._is_valid_channel(name):
continue
if name.lower() not in seen:
unique.append(name)
seen.add(name.lower())
return unique
@staticmethod
def _is_valid_channel(name: str) -> bool:
if len(name) < 2:
return False
allowed = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_#"
return all(ch in allowed for ch in name)
def _log(self, message: str, color=None, bold: bool = False):
try:
self.log_callback(message, color, bold)
except Exception:
logger.info(message)
manager.start()
print(f"IRC server running on {args.host}:{args.port}")
# Keep the main thread alive while server runs
while manager.is_running():
import time
time.sleep(1)
except Exception as e:
print(f"Error: {e}")
manager.stop()
sys.exit(1)
main()

179
src/LocalServerManager.py Normal file
View File

@@ -0,0 +1,179 @@
import ipaddress
import socket
import threading
import logging
from typing import Callable, Iterable, List, Optional
from LocalServer import LocalOnlyIRCServer
from irc import server as irc_server
logger = logging.getLogger(__name__)
def _default_log_callback(message: str, color=None, bold: bool = False):
"""Fallback logger when UI callback is not available."""
logger.info(message)
class LocalServerManager:
"""Manages the background IRC server lifecycle."""
DEFAULT_CHANNELS = ["#lobby"]
DEFAULT_ALLOWED_NETWORKS = [
ipaddress.ip_network("127.0.0.0/8"), # Loopback
ipaddress.ip_network("10.0.0.0/8"), # RFC1918
ipaddress.ip_network("172.16.0.0/12"), # RFC1918
ipaddress.ip_network("192.168.0.0/16"), # RFC1918
ipaddress.ip_network("169.254.0.0/16"), # Link-local
]
def __init__(
self,
log_callback: Callable[[str, Optional[object], bool], None] = _default_log_callback,
listen_host: str = "0.0.0.0",
listen_port: int = 6667,
):
self.log_callback = log_callback or _default_log_callback
self.listen_host = listen_host
self.listen_port = listen_port
self.allowed_networks = list(self.DEFAULT_ALLOWED_NETWORKS)
self._channels = list(self.DEFAULT_CHANNELS)
self._server = None
self._thread = None
self._lock = threading.RLock()
self._running = threading.Event()
self._ready = threading.Event()
self._error: Optional[Exception] = None
# Public API ---------------------------------------------------------
def start(self, timeout: float = 5.0):
"""Start the background IRC server."""
with self._lock:
if self._running.is_set():
raise RuntimeError("Local IRC server is already running.")
self._running.set()
self._ready.clear()
self._error = None
self._thread = threading.Thread(
target=self._serve_forever,
name="Local-IRC-Server",
daemon=True,
)
self._thread.start()
if not self._ready.wait(timeout):
self._running.clear()
raise TimeoutError("Local IRC server failed to start in time.")
if self._error:
raise self._error
def stop(self, timeout: float = 5.0):
"""Stop the IRC server if it is running."""
with self._lock:
if not self._running.is_set():
return
server = self._server
thread = self._thread
if server:
server.shutdown()
server.server_close()
if thread:
thread.join(timeout=timeout)
with self._lock:
self._server = None
self._thread = None
self._running.clear()
self._ready.clear()
def is_running(self) -> bool:
return self._running.is_set()
def set_listen_host(self, host: str):
"""Update the bind address. Local server must be stopped."""
with self._lock:
if self._running.is_set():
raise RuntimeError("Stop the server before changing the interface.")
self.listen_host = host
self._log(f"Local server interface set to {host}.")
def get_channels(self) -> List[str]:
with self._lock:
return list(self._channels)
def set_channels(self, channels: Iterable[str]):
cleaned = self._sanitize_channels(channels)
with self._lock:
self._channels = cleaned or list(self.DEFAULT_CHANNELS)
if self.is_running():
self._log(
"Channel list updated. Restart local server to apply changes.",
bold=True,
)
# Internal helpers ---------------------------------------------------
def _serve_forever(self):
try:
server = LocalOnlyIRCServer(
(self.listen_host, self.listen_port),
irc_server.IRCClient,
self.allowed_networks,
blocked_callback=lambda ip: self._log(
f"Blocked connection attempt from {ip}", bold=False
),
)
server.servername = socket.gethostname() or "wxirc-local"
with self._lock:
self._server = server
seed_channels = self._channels or list(self.DEFAULT_CHANNELS)
for channel in seed_channels:
server.channels.setdefault(channel, irc_server.IRCChannel(channel))
self._log(
f"Local IRC server listening on {self.listen_host}:{self.listen_port}",
bold=True,
)
self._ready.set()
server.serve_forever()
except Exception as exc:
logger.exception("Local IRC server failed: %s", exc)
self._error = exc
self._ready.set()
self._log(f"Local server error: {exc}", bold=True)
finally:
self._running.clear()
self._log("Local IRC server stopped.")
def _sanitize_channels(self, channels: Iterable[str]) -> List[str]:
unique = []
seen = set()
for channel in channels:
if not channel:
continue
name = channel.strip()
if not name.startswith("#"):
name = f"#{name}"
if not self._is_valid_channel(name):
continue
if name.lower() not in seen:
unique.append(name)
seen.add(name.lower())
return unique
@staticmethod
def _is_valid_channel(name: str) -> bool:
if len(name) < 2:
return False
allowed = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_#"
return all(ch in allowed for ch in name)
def _log(self, message: str, color=None, bold: bool = False):
try:
self.log_callback(message, color, bold)
except Exception:
logger.info(message)

104
src/ManageChannelsDialog.py Normal file
View File

@@ -0,0 +1,104 @@
import wx
import wx.adv
class ManageChannelsDialog(wx.Dialog):
"""Simple dialog for curating the local server channel allowlist."""
def __init__(self, parent, channels):
super().__init__(parent, title="Manage Local Channels", size=(360, 420))
try:
self.SetIcon(parent.GetIcon())
except Exception:
pass
panel = wx.Panel(self)
panel.SetBackgroundColour(parent.theme["window_bg"])
main_sizer = wx.BoxSizer(wx.VERTICAL)
info = wx.StaticText(
panel,
label="Channels are shared with anyone on your LAN who joins the built-in server.",
)
info.Wrap(320)
main_sizer.Add(info, 0, wx.ALL | wx.EXPAND, 8)
self.list_box = wx.ListBox(panel)
for channel in channels:
self.list_box.Append(channel)
main_sizer.Add(self.list_box, 1, wx.ALL | wx.EXPAND, 8)
input_sizer = wx.BoxSizer(wx.HORIZONTAL)
self.channel_input = wx.TextCtrl(panel, style=wx.TE_PROCESS_ENTER)
self.channel_input.SetHint("#channel-name")
add_btn = wx.Button(panel, label="Add")
input_sizer.Add(self.channel_input, 1, wx.RIGHT, 4)
input_sizer.Add(add_btn, 0)
main_sizer.Add(input_sizer, 0, wx.EXPAND | wx.LEFT | wx.RIGHT, 8)
remove_btn = wx.Button(panel, label="Remove Selected")
reset_btn = wx.Button(panel, label="Reset to #lobby")
btn_sizer = wx.BoxSizer(wx.HORIZONTAL)
btn_sizer.Add(remove_btn, 1, wx.RIGHT, 4)
btn_sizer.Add(reset_btn, 1)
main_sizer.Add(btn_sizer, 0, wx.EXPAND | wx.LEFT | wx.RIGHT | wx.TOP, 8)
button_bar = wx.StdDialogButtonSizer()
ok_btn = wx.Button(panel, wx.ID_OK)
cancel_btn = wx.Button(panel, wx.ID_CANCEL)
button_bar.AddButton(ok_btn)
button_bar.AddButton(cancel_btn)
button_bar.Realize()
main_sizer.Add(button_bar, 0, wx.ALL | wx.EXPAND, 8)
panel.SetSizer(main_sizer)
# Bindings
add_btn.Bind(wx.EVT_BUTTON, self.on_add)
remove_btn.Bind(wx.EVT_BUTTON, self.on_remove)
reset_btn.Bind(wx.EVT_BUTTON, self.on_reset)
self.channel_input.Bind(wx.EVT_TEXT_ENTER, self.on_add)
ok_btn.Bind(wx.EVT_BUTTON, self.on_ok)
def get_channels(self):
return [self.list_box.GetString(i) for i in range(self.list_box.GetCount())]
def on_add(self, event):
value = self.channel_input.GetValue().strip()
if not value:
return
if not value.startswith('#'):
value = f"#{value}"
if not self._is_valid_channel(value):
wx.MessageBox(
"Channel names may contain letters, numbers, -, and _ only.",
"Invalid Channel",
wx.OK | wx.ICON_WARNING,
)
return
if self.list_box.FindString(value) != wx.NOT_FOUND:
wx.MessageBox("Channel already exists.", "Duplicate Channel", wx.OK | wx.ICON_INFORMATION)
return
self.list_box.Append(value)
self.channel_input.Clear()
def on_remove(self, event):
selection = self.list_box.GetSelection()
if selection != wx.NOT_FOUND:
self.list_box.Delete(selection)
def on_reset(self, event):
self.list_box.Clear()
self.list_box.Append("#lobby")
def on_ok(self, event):
if self.list_box.GetCount() == 0:
wx.MessageBox("Add at least one channel before saving.", "No Channels", wx.OK | wx.ICON_INFORMATION)
return
event.Skip()
@staticmethod
def _is_valid_channel(name):
if len(name) < 2:
return False
allowed = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_#"
return all(ch in allowed for ch in name)

120
src/ScanHandler.py Normal file
View File

@@ -0,0 +1,120 @@
from concurrent.futures import ThreadPoolExecutor, as_completed
import socket
import threading
import psutil
import ipaddress
import logging
logger = logging.getLogger(__name__)
class ScanHandler:
"""Fast local network IRC scanner with minimal network overhead."""
def __init__(self, timeout=0.35, max_workers=64):
self.timeout = timeout
self.max_workers = max_workers
self._stop_event = threading.Event()
self._thread = None
self.results = []
self.total_hosts = 0
def detect_networks(self):
"""Return private IPv4 networks discovered on local interfaces."""
networks = []
try:
for iface, addrs in psutil.net_if_addrs().items():
for addr in addrs:
if addr.family != socket.AF_INET or not addr.netmask:
continue
try:
interface = ipaddress.IPv4Interface(f"{addr.address}/{addr.netmask}")
except ValueError:
continue
if not interface.network.is_private:
continue
network = self._cap_network(interface)
label = f"{iface} : {interface.ip} through {network.with_prefixlen} range"
networks.append({"label": label, "cidr": str(network)})
except Exception as exc:
logger.error("Failed to enumerate interfaces: %s", exc)
if not networks:
default_net = "192.168.1.0/24"
label = f"Default guess : {default_net}"
networks.append({"label": label, "cidr": default_net})
return networks
def start_scan(self, network_cidr, ports, progress_cb=None, result_cb=None, done_cb=None):
"""Launch the threaded scan."""
if self._thread and self._thread.is_alive():
return False
try:
network = ipaddress.ip_network(network_cidr, strict=False)
except ValueError:
return False
hosts = [str(host) for host in network.hosts()]
if not hosts:
hosts = [str(network.network_address)]
self.total_hosts = len(hosts)
self.results.clear()
self._stop_event.clear()
def _worker():
logger.info("Starting IRC scan across %s hosts (%s)", len(hosts), network_cidr)
scanned = 0
try:
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = {executor.submit(self._probe_host, host, ports): host for host in hosts}
for future in as_completed(futures):
if self._stop_event.is_set():
break
scanned += 1
server_info = future.result()
if server_info:
self.results.append(server_info)
if result_cb:
result_cb(server_info)
if progress_cb:
progress_cb(scanned, self.total_hosts)
except Exception as exc:
logger.error("Scan failure: %s", exc)
finally:
if done_cb:
done_cb(self.results)
logger.info("IRC scan finished (%s discovered)", len(self.results))
self._thread = threading.Thread(target=_worker, name="IRC-Scan", daemon=True)
self._thread.start()
return True
def stop_scan(self):
self._stop_event.set()
def _probe_host(self, host, ports):
if self._stop_event.is_set():
return None
for port in ports:
try:
with socket.create_connection((host, port), timeout=self.timeout) as sock:
sock.settimeout(0.2)
banner = ""
try:
chunk = sock.recv(256)
if chunk:
banner = chunk.decode(errors="ignore").strip()
except socket.timeout:
banner = "IRC server (silent banner)"
except OSError:
pass
return {"address": host, "port": port, "banner": banner or "IRC server detected"}
except (socket.timeout, ConnectionRefusedError, OSError):
continue
return None
@staticmethod
def _cap_network(interface):
"""Cap huge networks to /24 to keep scans lightweight."""
if interface.network.prefixlen >= 24:
return interface.network
return ipaddress.ip_network(f"{interface.ip}/24", strict=False)

View File

@@ -1,299 +1,19 @@
import ipaddress
import logging
import socket
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
import ScanHandler
import ScanWizardIntroPage
import ScanWizardResultsPage
import psutil
import wx
import wx.adv as adv
logger = logging.getLogger(__name__)
class ScanHandler:
"""Fast local network IRC scanner with minimal network overhead."""
def __init__(self, timeout=0.35, max_workers=64):
self.timeout = timeout
self.max_workers = max_workers
self._stop_event = threading.Event()
self._thread = None
self.results = []
self.total_hosts = 0
def detect_networks(self):
"""Return private IPv4 networks discovered on local interfaces."""
networks = []
try:
for iface, addrs in psutil.net_if_addrs().items():
for addr in addrs:
if addr.family != socket.AF_INET or not addr.netmask:
continue
try:
interface = ipaddress.IPv4Interface(f"{addr.address}/{addr.netmask}")
except ValueError:
continue
if not interface.network.is_private:
continue
network = self._cap_network(interface)
label = f"{iface} : {interface.ip} through {network.with_prefixlen} range"
networks.append({"label": label, "cidr": str(network)})
except Exception as exc:
logger.error("Failed to enumerate interfaces: %s", exc)
if not networks:
default_net = "192.168.1.0/24"
label = f"Default guess : {default_net}"
networks.append({"label": label, "cidr": default_net})
return networks
def start_scan(self, network_cidr, ports, progress_cb=None, result_cb=None, done_cb=None):
"""Launch the threaded scan."""
if self._thread and self._thread.is_alive():
return False
try:
network = ipaddress.ip_network(network_cidr, strict=False)
except ValueError:
return False
hosts = [str(host) for host in network.hosts()]
if not hosts:
hosts = [str(network.network_address)]
self.total_hosts = len(hosts)
self.results.clear()
self._stop_event.clear()
def _worker():
logger.info("Starting IRC scan across %s hosts (%s)", len(hosts), network_cidr)
scanned = 0
try:
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = {executor.submit(self._probe_host, host, ports): host for host in hosts}
for future in as_completed(futures):
if self._stop_event.is_set():
break
scanned += 1
server_info = future.result()
if server_info:
self.results.append(server_info)
if result_cb:
result_cb(server_info)
if progress_cb:
progress_cb(scanned, self.total_hosts)
except Exception as exc:
logger.error("Scan failure: %s", exc)
finally:
if done_cb:
done_cb(self.results)
logger.info("IRC scan finished (%s discovered)", len(self.results))
self._thread = threading.Thread(target=_worker, name="IRC-Scan", daemon=True)
self._thread.start()
return True
def stop_scan(self):
self._stop_event.set()
def _probe_host(self, host, ports):
if self._stop_event.is_set():
return None
for port in ports:
try:
with socket.create_connection((host, port), timeout=self.timeout) as sock:
sock.settimeout(0.2)
banner = ""
try:
chunk = sock.recv(256)
if chunk:
banner = chunk.decode(errors="ignore").strip()
except socket.timeout:
banner = "IRC server (silent banner)"
except OSError:
pass
return {"address": host, "port": port, "banner": banner or "IRC server detected"}
except (socket.timeout, ConnectionRefusedError, OSError):
continue
return None
@staticmethod
def _cap_network(interface):
"""Cap huge networks to /24 to keep scans lightweight."""
if interface.network.prefixlen >= 24:
return interface.network
return ipaddress.ip_network(f"{interface.ip}/24", strict=False)
class ScanWizardIntroPage(adv.WizardPageSimple):
def __init__(self, parent, scan_handler):
super().__init__(parent)
self.scan_handler = scan_handler
self.networks = self.scan_handler.detect_networks()
self._build_ui()
def _build_ui(self):
sizer = wx.BoxSizer(wx.VERTICAL)
intro = wx.StaticText(self, label="Scan the local network for open IRC servers.\n\n Security Warning: This scan may reveal information about your device, and may make you vulnerable to attacks.")
intro.Wrap(420)
sizer.Add(intro, 0, wx.ALL, 5)
sizer.Add(wx.StaticText(self, label="Network"), 0, wx.TOP | wx.LEFT, 8)
labels = [net["label"] for net in self.networks]
self.network_choice = wx.Choice(self, choices=labels)
if labels:
self.network_choice.SetSelection(0)
sizer.Add(self.network_choice, 0, wx.EXPAND | wx.ALL, 5)
sizer.Add(wx.StaticText(self, label="Ports (comma separated)"), 0, wx.TOP | wx.LEFT, 8)
self.port_ctrl = wx.TextCtrl(self, value="6667,6697")
sizer.Add(self.port_ctrl, 0, wx.EXPAND | wx.ALL, 5)
self.SetSizer(sizer)
def get_scan_params(self):
selection = self.network_choice.GetSelection()
if selection == wx.NOT_FOUND:
wx.MessageBox("Select a network to scan.", "Missing selection", wx.OK | wx.ICON_WARNING)
return None
cidr = self.networks[selection]["cidr"]
raw_ports = self.port_ctrl.GetValue().split(",")
ports = []
for raw in raw_ports:
raw = raw.strip()
if not raw:
continue
try:
port = int(raw)
if 1 <= port <= 65535:
ports.append(port)
except ValueError:
continue
if not ports:
wx.MessageBox("Enter at least one valid TCP port.", "Invalid ports", wx.OK | wx.ICON_WARNING)
return None
try:
network = ipaddress.ip_network(cidr, strict=False)
except ValueError:
wx.MessageBox("Invalid network selection.", "Network error", wx.OK | wx.ICON_ERROR)
return None
host_count = max(network.num_addresses - (2 if network.version == 4 and network.prefixlen <= 30 else 0), 1)
return {"cidr": str(network), "ports": ports, "host_count": host_count}
class ScanWizardResultsPage(adv.WizardPageSimple):
def __init__(self, parent, scan_handler, main_frame):
super().__init__(parent)
self.scan_handler = scan_handler
self.main_frame = main_frame
self.discovered = []
self._build_ui()
def _build_ui(self):
sizer = wx.BoxSizer(wx.VERTICAL)
self.summary = wx.StaticText(self, label="Waiting to start…")
sizer.Add(self.summary, 0, wx.ALL, 5)
self.gauge = wx.Gauge(self, range=100, style=wx.GA_SMOOTH)
sizer.Add(self.gauge, 0, wx.EXPAND | wx.ALL, 5)
self.results_list = wx.ListCtrl(self, style=wx.LC_REPORT | wx.BORDER_SUNKEN)
self.results_list.InsertColumn(0, "Address", width=140)
self.results_list.InsertColumn(1, "Port", width=60)
self.results_list.InsertColumn(2, "Details", width=260)
self.results_list.Bind(wx.EVT_LIST_ITEM_SELECTED, self._toggle_buttons)
self.results_list.Bind(wx.EVT_LIST_ITEM_DESELECTED, self._toggle_buttons)
sizer.Add(self.results_list, 1, wx.EXPAND | wx.ALL, 5)
btn_row = wx.BoxSizer(wx.HORIZONTAL)
self.quick_connect_btn = wx.Button(self, label="Quick Connect")
self.quick_connect_btn.Disable()
self.quick_connect_btn.Bind(wx.EVT_BUTTON, self.on_quick_connect)
btn_row.Add(self.quick_connect_btn, 0, wx.RIGHT, 5)
self.rescan_btn = wx.Button(self, label="Rescan")
self.rescan_btn.Bind(wx.EVT_BUTTON, self.on_rescan)
btn_row.Add(self.rescan_btn, 0)
sizer.Add(btn_row, 0, wx.ALL | wx.ALIGN_RIGHT, 5)
self.SetSizer(sizer)
def prepare_for_scan(self, params, start_callback):
self.results_list.DeleteAllItems()
self.discovered = []
self.summary.SetLabel(f"Scanning {params['cidr']} on ports {', '.join(map(str, params['ports']))}")
self.gauge.SetRange(max(params["host_count"], 1))
self.gauge.SetValue(0)
self.quick_connect_btn.Disable()
start_callback(params)
def on_scan_progress(self, scanned, total):
try:
total = max(total, 1)
self.gauge.SetRange(total)
self.gauge.SetValue(min(scanned, total))
self.summary.SetLabel(f"Scanning… {scanned}/{total} hosts checked")
except RuntimeError:
# C++ SHIT
logger.debug("Scan progress update after controls destroyed; ignoring")
def on_scan_result(self, server_info):
"""Handle a single discovered server row."""
try:
idx = self.results_list.InsertItem(self.results_list.GetItemCount(), server_info["address"])
self.results_list.SetItem(idx, 1, str(server_info["port"]))
self.results_list.SetItem(idx, 2, server_info.get("banner", "IRC server detected"))
self.discovered.append(server_info)
self.summary.SetLabel(
f"Found {len(self.discovered)} {'server' if len(self.discovered) == 1 else 'servers'}"
)
except RuntimeError:
logger.debug("Scan result update after controls destroyed; ignoring")
def on_scan_complete(self, results):
"""Final scan completion callback."""
try:
if results:
self.summary.SetLabel(
f"Scan complete : {len(results)} "
f"{'server' if len(results) == 1 else 'servers'} ready."
)
else:
self.summary.SetLabel("Scan complete : no IRC servers discovered.")
self._toggle_buttons()
except RuntimeError:
logger.debug("Scan completion update after controls destroyed; ignoring")
def on_quick_connect(self, event):
row = self.results_list.GetFirstSelected()
if row == -1:
return
server = self.results_list.GetItemText(row, 0)
port = int(self.results_list.GetItemText(row, 1))
if self.main_frame.quick_connect(server, port):
self.GetParent().EndModal(wx.ID_OK)
def on_rescan(self, event):
wizard = self.GetParent()
wizard.ShowPage(wizard.intro_page)
def _toggle_buttons(self, event=None):
has_selection = self.results_list.GetFirstSelected() != -1
if has_selection:
self.quick_connect_btn.Enable()
else:
self.quick_connect_btn.Disable()
class ScanWizardDialog(adv.Wizard):
"""Wizard that drives the ScanHandler workflow."""
def __init__(self, parent):
super().__init__(parent, title="wxScan")
self.scan_handler = ScanHandler()
self.intro_page = ScanWizardIntroPage(self, self.scan_handler)
self.results_page = ScanWizardResultsPage(self, self.scan_handler, parent)
self.scan_handler = ScanHandler.ScanHandler()
self.intro_page = ScanWizardIntroPage.ScanWizardIntroPage(self, self.scan_handler)
self.results_page = ScanWizardResultsPage.ScanWizardResultsPage(self, self.scan_handler, parent)
self._chain_pages()
self.Bind(adv.EVT_WIZARD_PAGE_CHANGING, self.on_page_changing)
self.Bind(adv.EVT_WIZARD_CANCEL, self.on_cancel)

View File

@@ -0,0 +1,59 @@
import wx
import wx.adv as adv
import ipaddress
class ScanWizardIntroPage(adv.WizardPageSimple):
def __init__(self, parent, scan_handler):
super().__init__(parent)
self.scan_handler = scan_handler
self.networks = self.scan_handler.detect_networks()
self._build_ui()
def _build_ui(self):
sizer = wx.BoxSizer(wx.VERTICAL)
intro = wx.StaticText(self, label="Scan the local network for open IRC servers.\n\n Security Warning: This scan may reveal information about your device, and may make you vulnerable to attacks.")
intro.Wrap(420)
sizer.Add(intro, 0, wx.ALL, 5)
sizer.Add(wx.StaticText(self, label="Network"), 0, wx.TOP | wx.LEFT, 8)
labels = [net["label"] for net in self.networks]
self.network_choice = wx.Choice(self, choices=labels)
if labels:
self.network_choice.SetSelection(0)
sizer.Add(self.network_choice, 0, wx.EXPAND | wx.ALL, 5)
sizer.Add(wx.StaticText(self, label="Ports (comma separated)"), 0, wx.TOP | wx.LEFT, 8)
self.port_ctrl = wx.TextCtrl(self, value="6667,6697")
sizer.Add(self.port_ctrl, 0, wx.EXPAND | wx.ALL, 5)
self.SetSizer(sizer)
def get_scan_params(self):
selection = self.network_choice.GetSelection()
if selection == wx.NOT_FOUND:
wx.MessageBox("Select a network to scan.", "Missing selection", wx.OK | wx.ICON_WARNING)
return None
cidr = self.networks[selection]["cidr"]
raw_ports = self.port_ctrl.GetValue().split(",")
ports = []
for raw in raw_ports:
raw = raw.strip()
if not raw:
continue
try:
port = int(raw)
if 1 <= port <= 65535:
ports.append(port)
except ValueError:
continue
if not ports:
wx.MessageBox("Enter at least one valid TCP port.", "Invalid ports", wx.OK | wx.ICON_WARNING)
return None
try:
network = ipaddress.ip_network(cidr, strict=False)
except ValueError:
wx.MessageBox("Invalid network selection.", "Network error", wx.OK | wx.ICON_ERROR)
return None
host_count = max(network.num_addresses - (2 if network.version == 4 and network.prefixlen <= 30 else 0), 1)
return {"cidr": str(network), "ports": ports, "host_count": host_count}

View File

@@ -0,0 +1,109 @@
import wx
import wx.adv as adv
import logging
logger = logging.getLogger(__name__)
class ScanWizardResultsPage(adv.WizardPageSimple):
def __init__(self, parent, scan_handler, main_frame):
super().__init__(parent)
self.scan_handler = scan_handler
self.main_frame = main_frame
self.discovered = []
self._build_ui()
def _build_ui(self):
sizer = wx.BoxSizer(wx.VERTICAL)
self.summary = wx.StaticText(self, label="Waiting to start…")
sizer.Add(self.summary, 0, wx.ALL, 5)
self.gauge = wx.Gauge(self, range=100, style=wx.GA_SMOOTH)
sizer.Add(self.gauge, 0, wx.EXPAND | wx.ALL, 5)
self.results_list = wx.ListCtrl(self, style=wx.LC_REPORT | wx.BORDER_SUNKEN)
self.results_list.InsertColumn(0, "Address", width=140)
self.results_list.InsertColumn(1, "Port", width=60)
self.results_list.InsertColumn(2, "Details", width=260)
self.results_list.Bind(wx.EVT_LIST_ITEM_SELECTED, self._toggle_buttons)
self.results_list.Bind(wx.EVT_LIST_ITEM_DESELECTED, self._toggle_buttons)
sizer.Add(self.results_list, 1, wx.EXPAND | wx.ALL, 5)
btn_row = wx.BoxSizer(wx.HORIZONTAL)
self.quick_connect_btn = wx.Button(self, label="Quick Connect")
self.quick_connect_btn.Disable()
self.quick_connect_btn.Bind(wx.EVT_BUTTON, self.on_quick_connect)
btn_row.Add(self.quick_connect_btn, 0, wx.RIGHT, 5)
self.rescan_btn = wx.Button(self, label="Rescan")
self.rescan_btn.Bind(wx.EVT_BUTTON, self.on_rescan)
btn_row.Add(self.rescan_btn, 0)
sizer.Add(btn_row, 0, wx.ALL | wx.ALIGN_RIGHT, 5)
self.SetSizer(sizer)
def prepare_for_scan(self, params, start_callback):
self.results_list.DeleteAllItems()
self.discovered = []
self.summary.SetLabel(f"Scanning {params['cidr']} on ports {', '.join(map(str, params['ports']))}")
self.gauge.SetRange(max(params["host_count"], 1))
self.gauge.SetValue(0)
self.quick_connect_btn.Disable()
start_callback(params)
def on_scan_progress(self, scanned, total):
try:
total = max(total, 1)
self.gauge.SetRange(total)
self.gauge.SetValue(min(scanned, total))
self.summary.SetLabel(f"Scanning… {scanned}/{total} hosts checked")
except RuntimeError:
# C++ SHIT
logger.debug("Scan progress update after controls destroyed; ignoring")
def on_scan_result(self, server_info):
"""Handle a single discovered server row."""
try:
idx = self.results_list.InsertItem(self.results_list.GetItemCount(), server_info["address"])
self.results_list.SetItem(idx, 1, str(server_info["port"]))
self.results_list.SetItem(idx, 2, server_info.get("banner", "IRC server detected"))
self.discovered.append(server_info)
self.summary.SetLabel(
f"Found {len(self.discovered)} {'server' if len(self.discovered) == 1 else 'servers'}"
)
except RuntimeError:
logger.debug("Scan result update after controls destroyed; ignoring")
def on_scan_complete(self, results):
"""Final scan completion callback."""
try:
if results:
self.summary.SetLabel(
f"Scan complete : {len(results)} "
f"{'server' if len(results) == 1 else 'servers'} ready."
)
else:
self.summary.SetLabel("Scan complete : no IRC servers discovered.")
self._toggle_buttons()
except RuntimeError:
logger.debug("Scan completion update after controls destroyed; ignoring")
def on_quick_connect(self, event):
row = self.results_list.GetFirstSelected()
if row == -1:
return
server = self.results_list.GetItemText(row, 0)
port = int(self.results_list.GetItemText(row, 1))
if self.main_frame.quick_connect(server, port):
self.GetParent().EndModal(wx.ID_OK)
def on_rescan(self, event):
wizard = self.GetParent()
wizard.ShowPage(wizard.intro_page)
def _toggle_buttons(self, event=None):
has_selection = self.results_list.GetFirstSelected() != -1
if has_selection:
self.quick_connect_btn.Enable()
else:
self.quick_connect_btn.Disable()

99
src/Win32API.py Normal file
View File

@@ -0,0 +1,99 @@
import ctypes
from ctypes import wintypes
import time
SW_HIDE = 0
SW_SHOW = 5
GWL_EXSTYLE = -20
WS_EX_TOOLWINDOW = 0x00000080
WS_EX_APPWINDOW = 0x00040000
WS_EX_LAYERED = 0x80000
LWA_ALPHA = 0x2
class Win32API:
def __init__(self, hwnd):
self.hwnd = hwnd
self.hidden = False
self._load_api()
def _load_api(self):
user32 = ctypes.windll.user32
self.ShowWindow = user32.ShowWindow
self.GetWindowLong = user32.GetWindowLongW
self.SetWindowLong = user32.SetWindowLongW
self._RegisterHotKey = user32.RegisterHotKey
self._SetLayeredWindowAttributes = user32.SetLayeredWindowAttributes
self.ShowWindow.argtypes = [wintypes.HWND, ctypes.c_int]
self.GetWindowLong.argtypes = [wintypes.HWND, ctypes.c_int]
self.SetWindowLong.argtypes = [wintypes.HWND, ctypes.c_int, ctypes.c_long]
self._RegisterHotKey.argtypes = [
wintypes.HWND,
wintypes.INT,
wintypes.UINT,
wintypes.UINT,
]
self._SetLayeredWindowAttributes.argtypes = [
wintypes.HWND,
wintypes.COLORREF,
wintypes.BYTE,
wintypes.DWORD,
]
def remove_from_taskbar(self):
style = self.GetWindowLong(self.hwnd, GWL_EXSTYLE)
new_style = (style & ~WS_EX_APPWINDOW) | WS_EX_TOOLWINDOW
self.SetWindowLong(self.hwnd, GWL_EXSTYLE, new_style)
def hide(self):
self.ShowWindow(self.hwnd, SW_HIDE)
self.hidden = True
def show(self):
self.ShowWindow(self.hwnd, SW_SHOW)
self.hidden = False
def toggle(self, fade=True, duration=500, steps=20):
"""Toggle window visibility with optional fade effect"""
if self.hidden:
if fade:
self.fade_in(duration, steps)
else:
self.show()
else:
if fade:
self.fade_out(duration, steps)
else:
self.hide()
def register_hotkey(self, hotkey_id, key, modifiers):
vk = ord(key.upper())
return self._RegisterHotKey(None, hotkey_id, modifiers, vk)
def _set_layered(self):
style = self.GetWindowLong(self.hwnd, GWL_EXSTYLE)
self.SetWindowLong(self.hwnd, GWL_EXSTYLE, style | WS_EX_LAYERED)
def fade_in(self, duration=50, steps=50):
self.show()
self._set_layered()
for idx in range(steps + 1):
alpha = int(255 * (idx / steps))
self._SetLayeredWindowAttributes(self.hwnd, 0, alpha, LWA_ALPHA)
ctypes.windll.kernel32.Sleep(int(duration / steps))
self.hidden = False
def fade_out(self, duration=50, steps=50):
self._set_layered()
for idx in range(steps + 1):
alpha = int(255 * (1 - idx /steps))
self._SetLayeredWindowAttributes(self.hwnd, 0, alpha, LWA_ALPHA)
ctypes.windll.kernel32.Sleep(int(duration / steps))
self.hidden = True
self.hide()

97
src/Win32SoundHandler.py Normal file
View File

@@ -0,0 +1,97 @@
import ctypes
import logging
from ctypes import wintypes
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def get_resource_path(relative_path):
import os
import sys
try:
base_path = sys._MEIPASS
except Exception:
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__)))
base_path = project_root
full_path = os.path.normpath(os.path.join(base_path, relative_path))
return full_path
class Win32SoundHandler:
def __init__(self, hwnd):
self.hwnd = hwnd
self.SND_ALIAS = 0x00010000
self.SND_FILENAME = 0x00020000
self.SND_ASYNC = 0x00000001
self._setup_playsoundapi()
def _setup_playsoundapi(self):
winmm = ctypes.windll.winmm
self.PlaySoundW = winmm.PlaySoundW
self.PlaySoundW.argtypes = (
wintypes.LPCWSTR,
wintypes.HMODULE,
wintypes.DWORD
)
self.PlaySoundW.restype = wintypes.BOOL
def play_info_sound(self):
self.PlaySoundW("SystemAsterisk", None, self.SND_ALIAS | self.SND_ASYNC)
def play_error_sound(self):
self.PlaySoundW("SystemHand", None, self.SND_ALIAS | self.SND_ASYNC)
def play_warn_sound(self):
self.PlaySoundW("SystemExclamation", None, self.SND_ALIAS | self.SND_ASYNC)
def play_question_sound(self):
self.PlaySoundW("SystemQuestion", None, self.SND_ALIAS | self.SND_ASYNC)
def play_default_sound(self):
self.PlaySoundW("SystemDefault", None, self.SND_ALIAS | self.SND_ASYNC)
def play_notification_sound(self):
self.PlaySoundW("SystemNotification", None, self.SND_ALIAS | self.SND_ASYNC)
def play_mail_sound(self):
self.PlaySoundW("MailBeep", None, self.SND_ALIAS | self.SND_ASYNC)
def play_exit_sound(self):
self.PlaySoundW("SystemExit", None, self.SND_ALIAS | self.SND_ASYNC)
def play_connect_server_or_channel(self):
try:
sound_path = get_resource_path("sounds/space-pdj.wav")
import os
if os.path.exists(sound_path):
self.PlaySoundW(sound_path, None, self.SND_FILENAME | self.SND_ASYNC)
else:
logger.warning(f"Sound file not found: {sound_path}")
except Exception as e:
logger.error(f"Error playing popout click sound: {e}")
def play_popout_click(self):
try:
sound_path = get_resource_path("sounds/startup.wav")
import os
if os.path.exists(sound_path):
self.PlaySoundW(sound_path, None, self.SND_FILENAME | self.SND_ASYNC)
else:
logger.warning(f"Sound file not found: {sound_path}")
except Exception as e:
logger.error(f"Error playing popout click sound: {e}")
def play_msg_recv(self):
try:
sound_path = get_resource_path("sounds/balloon.wav")
import os
if os.path.exists(sound_path):
self.PlaySoundW(sound_path, None, self.SND_FILENAME | self.SND_ASYNC)
else:
logger.warning(f"Sound file not found: {sound_path}")
except Exception as e:
logger.error(f"Error playing message received sound: {e}")

BIN
src/channel.ico Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 97 KiB

View File

@@ -1,4 +1,6 @@
import wx
import wx.aui
import wx.lib.agw.aui as aui
import irc.client
import threading
import re
@@ -15,9 +17,12 @@ import sys
from PrivacyNoticeDialog import PrivacyNoticeDialog
from IRCPanel import IRCPanel
from AboutDialog import AboutDialog
from InterfaceSelectDialog import InterfaceSelectDialog
from ManageChannelsDialog import ManageChannelsDialog
from NotesDialog import NotesDialog
if os.name == "nt": from Win32API import Win32API ; from Win32SoundHandler import Win32SoundHandler
from ScanWizard import ScanWizardDialog
from LocalServer import LocalServerManager
from LocalServerManager import LocalServerManager
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
@@ -40,207 +45,6 @@ class UIUpdate:
self.args = args
self.kwargs = kwargs
class ManageChannelsDialog(wx.Dialog):
"""Simple dialog for curating the local server channel allowlist."""
def __init__(self, parent, channels):
super().__init__(parent, title="Manage Local Channels", size=(360, 420))
try:
self.SetIcon(parent.GetIcon())
except Exception:
pass
panel = wx.Panel(self)
panel.SetBackgroundColour(parent.theme["window_bg"])
main_sizer = wx.BoxSizer(wx.VERTICAL)
info = wx.StaticText(
panel,
label="Channels are shared with anyone on your LAN who joins the built-in server.",
)
info.Wrap(320)
main_sizer.Add(info, 0, wx.ALL | wx.EXPAND, 8)
self.list_box = wx.ListBox(panel)
for channel in channels:
self.list_box.Append(channel)
main_sizer.Add(self.list_box, 1, wx.ALL | wx.EXPAND, 8)
input_sizer = wx.BoxSizer(wx.HORIZONTAL)
self.channel_input = wx.TextCtrl(panel, style=wx.TE_PROCESS_ENTER)
self.channel_input.SetHint("#channel-name")
add_btn = wx.Button(panel, label="Add")
input_sizer.Add(self.channel_input, 1, wx.RIGHT, 4)
input_sizer.Add(add_btn, 0)
main_sizer.Add(input_sizer, 0, wx.EXPAND | wx.LEFT | wx.RIGHT, 8)
remove_btn = wx.Button(panel, label="Remove Selected")
reset_btn = wx.Button(panel, label="Reset to #lobby")
btn_sizer = wx.BoxSizer(wx.HORIZONTAL)
btn_sizer.Add(remove_btn, 1, wx.RIGHT, 4)
btn_sizer.Add(reset_btn, 1)
main_sizer.Add(btn_sizer, 0, wx.EXPAND | wx.LEFT | wx.RIGHT | wx.TOP, 8)
button_bar = wx.StdDialogButtonSizer()
ok_btn = wx.Button(panel, wx.ID_OK)
cancel_btn = wx.Button(panel, wx.ID_CANCEL)
button_bar.AddButton(ok_btn)
button_bar.AddButton(cancel_btn)
button_bar.Realize()
main_sizer.Add(button_bar, 0, wx.ALL | wx.EXPAND, 8)
panel.SetSizer(main_sizer)
# Bindings
add_btn.Bind(wx.EVT_BUTTON, self.on_add)
remove_btn.Bind(wx.EVT_BUTTON, self.on_remove)
reset_btn.Bind(wx.EVT_BUTTON, self.on_reset)
self.channel_input.Bind(wx.EVT_TEXT_ENTER, self.on_add)
ok_btn.Bind(wx.EVT_BUTTON, self.on_ok)
def get_channels(self):
return [self.list_box.GetString(i) for i in range(self.list_box.GetCount())]
def on_add(self, event):
value = self.channel_input.GetValue().strip()
if not value:
return
if not value.startswith('#'):
value = f"#{value}"
if not self._is_valid_channel(value):
wx.MessageBox(
"Channel names may contain letters, numbers, -, and _ only.",
"Invalid Channel",
wx.OK | wx.ICON_WARNING,
)
return
if self.list_box.FindString(value) != wx.NOT_FOUND:
wx.MessageBox("Channel already exists.", "Duplicate Channel", wx.OK | wx.ICON_INFORMATION)
return
self.list_box.Append(value)
self.channel_input.Clear()
def on_remove(self, event):
selection = self.list_box.GetSelection()
if selection != wx.NOT_FOUND:
self.list_box.Delete(selection)
def on_reset(self, event):
self.list_box.Clear()
self.list_box.Append("#lobby")
def on_ok(self, event):
if self.list_box.GetCount() == 0:
wx.MessageBox("Add at least one channel before saving.", "No Channels", wx.OK | wx.ICON_INFORMATION)
return
event.Skip()
@staticmethod
def _is_valid_channel(name):
if len(name) < 2:
return False
allowed = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_#"
return all(ch in allowed for ch in name)
class InterfaceSelectDialog(wx.Dialog):
"""Dialog that lets the user pick which local interface the server should bind to."""
def __init__(self, parent, current_host="127.0.0.1"):
super().__init__(parent, title="Select Network Interface", size=(420, 380))
try:
self.SetIcon(parent.GetIcon())
except Exception:
pass
self.selected_host = current_host
panel = wx.Panel(self)
panel.SetBackgroundColour(parent.theme["window_bg"])
main_sizer = wx.BoxSizer(wx.VERTICAL)
info = wx.StaticText(
panel,
label="Choose the Network interface where your server should run:\n"
"You are EXPOSING a Server to your LOCAL Network, this may give away who you are!\n",
)
info.Wrap(380)
main_sizer.Add(info, 0, wx.ALL | wx.EXPAND, 10)
self.interface_list = wx.ListCtrl(panel, style=wx.LC_REPORT | wx.BORDER_SUNKEN)
self.interface_list.InsertColumn(0, "Interface", width=180)
self.interface_list.InsertColumn(1, "Address", width=180)
self.interfaces = self._gather_interfaces()
current_index = 0
for idx, entry in enumerate[tuple[str, str]](self.interfaces):
name, address = entry
self.interface_list.InsertItem(idx, name)
self.interface_list.SetItem(idx, 1, address)
if address == current_host:
current_index = idx
self.interface_list.Select(current_index)
self.interface_list.EnsureVisible(current_index)
if self.interfaces:
self.selected_host = self.interfaces[current_index][1]
self.interface_list.Bind(wx.EVT_LIST_ITEM_SELECTED, self.on_select)
self.interface_list.Bind(wx.EVT_LIST_ITEM_ACTIVATED, self.on_activate)
main_sizer.Add(self.interface_list, 1, wx.ALL | wx.EXPAND, 10)
button_bar = wx.StdDialogButtonSizer()
ok_btn = wx.Button(panel, wx.ID_OK)
cancel_btn = wx.Button(panel, wx.ID_CANCEL)
button_bar.AddButton(ok_btn)
button_bar.AddButton(cancel_btn)
button_bar.Realize()
main_sizer.Add(button_bar, 0, wx.ALL | wx.EXPAND, 10)
ok_btn.Bind(wx.EVT_BUTTON, self.on_ok)
panel.SetSizer(main_sizer)
def on_select(self, event):
index = event.GetIndex()
_, address = self.interfaces[index]
self.selected_host = address
def _gather_interfaces(self):
entries = [
("Loopback only", "127.0.0.1"),
("All interfaces", "0.0.0.0"),
]
try:
import psutil
seen = {addr for _, addr in entries}
for name, addrs in psutil.net_if_addrs().items():
for addr in addrs:
if addr.family == socket.AF_INET and addr.address not in seen:
label = f"{name}"
entries.append((label, addr.address))
seen.add(addr.address)
except Exception as e:
logger.warning(f"Unable to enumerate network interfaces: {e}")
return entries
def get_selected_host(self):
return self.selected_host
def on_activate(self, event):
self.on_select(event)
self.EndModal(wx.ID_OK)
def on_ok(self, event):
index = self.interface_list.GetFirstSelected()
if index == -1 and self.interfaces:
wx.MessageBox("Select an interface before starting the server.", "No Interface Selected", wx.OK | wx.ICON_INFORMATION)
return
if index != -1:
_, address = self.interfaces[index]
self.selected_host = address
event.Skip()
class IRCFrame(wx.Frame):
def __init__(self):
super().__init__(None, title="wxIRC", size=(1200, 700))
@@ -251,7 +55,7 @@ class IRCFrame(wx.Frame):
# Show privacy notice first
self.show_privacy_notice()
self.reactor = None
self.connection = None
self.reactor_thread = None
@@ -271,7 +75,7 @@ class IRCFrame(wx.Frame):
self.auto_join_channels = []
self.away = False
self.timestamps = True
self.notes_data = defaultdict(dict)
self.server_menu_items = {}
self.local_bind_host = "127.0.0.1"
@@ -300,6 +104,18 @@ class IRCFrame(wx.Frame):
self.motd_lines = []
self.collecting_motd = False
self.hwnd = self.GetHandle()
self.ctrl = Win32API(self.hwnd)
# Initialize sound handler for Windows
if os.name == "nt":
self.sound_handler = Win32SoundHandler(self.hwnd)
else:
self.sound_handler = None
# Sound toggle state (enabled by default)
self.sounds_enabled = True
self.setup_irc_handlers()
self.create_menubar()
self.setup_ui()
@@ -329,6 +145,17 @@ class IRCFrame(wx.Frame):
self.Bind(wx.EVT_MENU, self.on_find_next, id=1001)
self.Bind(wx.EVT_MENU, self.on_find_previous, id=1002)
self.Bind(wx.EVT_MENU, self.on_quick_escape, id=1003)
HOTKEY_ID = 1
MOD_CONTROL = 0x0002
MOD_ALT = 0x0001
# Register directly on the wx.Frame
self.RegisterHotKey(HOTKEY_ID, MOD_CONTROL | MOD_ALT, ord('H'))
self.Bind(wx.EVT_HOTKEY, self.on_hotkey, id=HOTKEY_ID)
def on_hotkey(self, event):
self.ctrl.toggle()
def build_theme(self):
"""Build a small theme descriptor that respects the host platform."""
@@ -397,7 +224,6 @@ class IRCFrame(wx.Frame):
def show_privacy_notice(self):
"""Show privacy notice dialog at startup"""
dlg = PrivacyNoticeDialog(self)
dlg.ShowModal()
dlg.Destroy()
def get_user_color(self, username):
@@ -528,15 +354,31 @@ class IRCFrame(wx.Frame):
left_panel.SetSizer(left_sizer)
# Center - Notebook
self.notebook = wx.Notebook(panel)
self.notebook = wx.aui.AuiNotebook(panel, style=
wx.aui.AUI_NB_DEFAULT_STYLE |
wx.aui.AUI_NB_CLOSE_ON_ACTIVE_TAB |
wx.aui.AUI_NB_MIDDLE_CLICK_CLOSE
)
self.notebook.SetBackgroundColour(self.theme["content_bg"])
# Setup tab icons
self.setup_tab_icons()
# Bind close event
self.notebook.Bind(wx.aui.EVT_AUINOTEBOOK_PAGE_CLOSE, self.on_notebook_page_close)
# Server panel
server_panel = IRCPanel(self.notebook, self)
server_panel.set_target("SERVER")
idx = self.notebook.GetPageCount()
self.notebook.AddPage(server_panel, "Server")
# THIS is the missing line
if self.icon_server != -1:
self.notebook.SetPageImage(idx, self.icon_server)
self.channels["SERVER"] = server_panel
# Right sidebar - Users - light gray for contrast
right_panel = wx.Panel(panel)
@@ -595,8 +437,12 @@ class IRCFrame(wx.Frame):
file_menu = wx.Menu()
file_menu.Append(300, "&About", "About wxIRC Client")
file_menu.AppendSeparator()
self.sound_toggle_item = file_menu.AppendCheckItem(301, "Toggle &Sounds")
self.sound_toggle_item.Check(True) # Sounds enabled by default
file_menu.AppendSeparator()
file_menu.Append(wx.ID_EXIT, "E&xit\tCtrl+Q")
self.Bind(wx.EVT_MENU, self.on_about, id=300)
self.Bind(wx.EVT_MENU, self.on_toggle_sounds, id=301)
self.Bind(wx.EVT_MENU, self.on_close, id=wx.ID_EXIT)
# Edit menu with search
@@ -669,6 +515,11 @@ class IRCFrame(wx.Frame):
except Exception as e:
logger.error(f"Error creating menu: {e}")
def on_toggle_sounds(self, event):
"""Toggle sound effects on/off"""
self.sounds_enabled = self.sound_toggle_item.IsChecked()
logger.info(f"Sounds {'enabled' if self.sounds_enabled else 'disabled'}")
def on_about(self, event):
"""Show About dialog"""
try:
@@ -677,6 +528,11 @@ class IRCFrame(wx.Frame):
dlg.Destroy()
except Exception as e:
logger.error(f"Error showing about dialog: {e}")
if self.sound_handler and self.sounds_enabled:
try:
self.sound_handler.play_error_sound()
except:
pass
def on_notes(self, event):
"""Open notes editor dialog"""
@@ -697,6 +553,11 @@ class IRCFrame(wx.Frame):
except Exception as e:
logger.error(f"Error opening notes dialog: {e}")
self.log_server(f"Error opening notes: {e}", wx.Colour(255, 0, 0))
if self.sound_handler and self.sounds_enabled:
try:
self.sound_handler.play_error_sound()
except:
pass
def on_notes_closed(self, event=None):
"""Handle notes frame closing"""
@@ -862,6 +723,13 @@ class IRCFrame(wx.Frame):
self.connect_btn.Enable(True)
self.SetStatusText("Connection failed")
logger.error(f"Connection failed: {error_msg}")
# Play error sound
if self.sound_handler and self.sounds_enabled:
try:
self.sound_handler.play_error_sound()
except Exception as e:
logger.error(f"Error playing connection failure sound: {e}")
def disconnect(self):
"""Safely disconnect from IRC server"""
@@ -892,6 +760,13 @@ class IRCFrame(wx.Frame):
def on_disconnect_cleanup(self):
"""Clean up after disconnect"""
# Play disconnect notification sound
if self.sound_handler and self.sounds_enabled:
try:
self.sound_handler.play_warn_sound()
except Exception as e:
logger.error(f"Error playing disconnect sound: {e}")
with self.connection_lock:
self.connection = None
self.is_connecting = False
@@ -1146,11 +1021,18 @@ Available commands:
try:
if channel in self.channels and channel != "SERVER":
def _close_channel():
# Find and delete the page
for i in range(self.notebook.GetPageCount()):
if self.notebook.GetPageText(i) == channel:
self.notebook.DeletePage(i)
break
del self.channels[channel]
# Clean up channel data
if channel in self.channels:
del self.channels[channel]
if channel in self.channel_users:
del self.channel_users[channel]
idx = self.channel_list.FindString(channel)
if idx != wx.NOT_FOUND:
@@ -1159,7 +1041,7 @@ Available commands:
self.safe_ui_update(_close_channel)
except Exception as e:
logger.error(f"Error closing channel: {e}")
def log_server(self, message, color=None, bold=False, italic=False, underline=False):
try:
if "SERVER" in self.channels:
@@ -1180,8 +1062,11 @@ Available commands:
def log_channel_message(self, channel, username, message, is_action=False, is_system=False):
"""Log a message to a channel with username coloring"""
try:
# Don't create new channels if they don't exist and we're trying to log to them
if channel not in self.channels:
self.safe_ui_update(self.add_channel, channel)
# Only create channel if it's being opened by the user, not just receiving messages
return
if channel in self.channels:
timestamp = self.get_timestamp()
@@ -1204,12 +1089,140 @@ Available commands:
except Exception as e:
logger.error(f"Error logging channel message: {e}")
def setup_tab_icons(self):
try:
self.tab_image_list = wx.ImageList(32, 32)
# Icon file names to search for
icon_files = {
'server': 'server.ico',
'channel': 'channel.ico',
'query': 'channel.ico' # Reuse channel.ico for queries if no query.ico exists
}
# Search paths for icons
base_paths = [
os.path.dirname(__file__),
get_resource_path(""),
os.path.join(os.getcwd(), "src"),
]
# Try to load each icon
loaded_icons = {}
for icon_type, filename in icon_files.items():
icon_path = None
for base_path in base_paths:
test_path = os.path.join(base_path, filename)
if os.path.exists(test_path):
icon_path = test_path
break
if icon_path:
try:
img = wx.Image(icon_path, wx.BITMAP_TYPE_ICO)
img = img.Scale(32, 32, wx.IMAGE_QUALITY_HIGH)
bmp = wx.Bitmap(img)
loaded_icons[icon_type] = self.tab_image_list.Add(bmp)
logger.info(f"Loaded {icon_type} icon from {icon_path}")
except Exception as e:
logger.warning(f"Failed to load {icon_type} icon: {e}")
loaded_icons[icon_type] = -1
else:
logger.info(f"{filename} not found for {icon_type}")
loaded_icons[icon_type] = -1
# Assign icon indices
self.icon_server = loaded_icons.get('server', -1)
self.icon_channel = loaded_icons.get('channel', -1)
self.icon_query = loaded_icons.get('query', -1)
# Use fallback icons if any failed to load
if self.icon_server == -1 or self.icon_channel == -1 or self.icon_query == -1:
logger.info("Using fallback icons for missing icon files")
self._setup_fallback_icons()
self.notebook.SetImageList(self.tab_image_list)
except Exception as e:
logger.error(f"Error setting up tab icons: {e}")
self.icon_server = -1
self.icon_channel = -1
self.icon_query = -1
def _setup_fallback_icons(self):
"""Setup fallback icons using wx.ArtProvider."""
try:
self.icon_server = self.tab_image_list.Add(
wx.ArtProvider.GetBitmap(wx.ART_INFORMATION, wx.ART_OTHER, (48, 48))
)
self.icon_channel = self.tab_image_list.Add(
wx.ArtProvider.GetBitmap(wx.ART_NORMAL_FILE, wx.ART_OTHER, (48, 48))
)
self.icon_query = self.tab_image_list.Add(
wx.ArtProvider.GetBitmap(wx.ART_HELP, wx.ART_OTHER, (48, 48))
)
except Exception as e:
logger.error(f"Error setting up fallback icons: {e}")
self.icon_server = -1
self.icon_channel = -1
self.icon_query = -1
def on_notebook_page_close(self, event):
"""Handle tab close button clicks"""
try:
page_idx = event.GetSelection()
channel = self.notebook.GetPageText(page_idx)
if channel == "Server":
event.Veto()
wx.MessageBox("Can't close Server!", "Error", wx.OK | wx.ICON_ERROR)
return
if channel in self.channels:
del self.channels[channel]
if channel in self.channel_users:
del self.channel_users[channel]
# Remove from channel list
idx = self.channel_list.FindString(channel)
if idx != wx.NOT_FOUND:
self.channel_list.Delete(idx)
if channel.startswith('#') and self.is_connected():
try:
self.connection.part(channel)
except:
pass
# Allow the close to proceed
event.Skip()
except Exception as e:
logger.error(f"Error closing tab: {e}")
event.Skip()
def add_channel(self, channel):
"""Add a new channel/query tab with appropriate icon"""
try:
if channel not in self.channels:
panel = IRCPanel(self.notebook, self)
panel.set_target(channel)
self.notebook.AddPage(panel, channel)
# Determine icon based on channel type
if channel == "SERVER":
icon_idx = getattr(self, 'icon_server', -1)
elif channel.startswith('#'):
icon_idx = getattr(self, 'icon_channel', -1)
else:
icon_idx = getattr(self, 'icon_query', -1)
# Add page with icon (if icon_idx is -1, no icon will be shown)
if icon_idx >= 0:
self.notebook.AddPage(panel, channel, select=True, imageId=icon_idx)
else:
self.notebook.AddPage(panel, channel, select=True)
self.channels[channel] = panel
if channel.startswith('#'):
@@ -1226,6 +1239,11 @@ Available commands:
except Exception as e:
logger.error(f"Error opening scan wizard: {e}")
self.log_server(f"Scan wizard failed to open: {e}", wx.Colour(255, 0, 0))
if self.sound_handler and self.sounds_enabled:
try:
self.sound_handler.play_error_sound()
except:
pass
def quick_connect(self, server, port):
"""Populate connection fields and initiate a connection if idle."""
@@ -1434,6 +1452,13 @@ COMMANDS (type /help in chat for full list):
self.log_server("Connected to server!", wx.Colour(0, 128, 0), bold=True) # Dark green
self.log_server(f"Welcome message: {' '.join(event.arguments)}", wx.Colour(0, 100, 0))
# Play welcome/connection sound
if self.sound_handler and self.sounds_enabled:
try:
self.sound_handler.play_connect_server_or_channel()
except Exception as e:
logger.error(f"Error playing welcome sound: {e}")
# Auto-join channels
for channel in self.auto_join_channels:
if not channel.startswith('#'):
@@ -1452,6 +1477,13 @@ COMMANDS (type /help in chat for full list):
if nick == self.nick:
self.safe_ui_update(self.add_channel, channel)
self.log_server(f"Joined channel {channel}", wx.Colour(0, 128, 0)) # Dark green
# Play sound when we join a channel
if self.sound_handler and self.sounds_enabled:
try:
self.sound_handler.play_connect_server_or_channel()
except Exception as e:
logger.error(f"Error playing channel join sound: {e}")
self.log_channel_message(channel, nick, f"{nick} joined", is_system=True)
@@ -1509,6 +1541,13 @@ COMMANDS (type /help in chat for full list):
self.log_channel_message(channel, nick, message, is_action=True)
else:
self.log_channel_message(channel, nick, message)
# Play sound notification only for real user messages (not from self)
if self.sound_handler and self.sounds_enabled and nick.lower() != self.nick.lower():
try:
self.sound_handler.play_msg_recv()
except Exception as e:
logger.error(f"Error playing message sound: {e}")
# Highlight own nick in messages
if self.nick.lower() in message.lower():
@@ -1527,6 +1566,13 @@ COMMANDS (type /help in chat for full list):
self.log_channel_message(nick, nick, message, is_action=True)
else:
self.log_channel_message(nick, nick, message)
# Play mail sound for private messages
if self.sound_handler and self.sounds_enabled and nick.lower() != self.nick.lower():
try:
self.sound_handler.play_mail_sound()
except Exception as e:
logger.error(f"Error playing private message sound: {e}")
except Exception as e:
logger.error(f"Error in privmsg handler: {e}")
@@ -1728,14 +1774,12 @@ if os.name == 'nt':
ctypes.windll.user32.SetProcessDPIAware()
except:
pass
else:
pass
if __name__ == "__main__":
try:
if os.name == 'nt':
enable_high_dpi()
app = wx.App()
frame = IRCFrame()
frame.SetIcon(wx.Icon(get_resource_path("icon.ico"), wx.BITMAP_TYPE_ICO))

BIN
src/server.ico Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 106 KiB

BIN
src/sounds/balloon.wav Normal file

Binary file not shown.

BIN
src/sounds/space-pdj.wav Normal file

Binary file not shown.

BIN
src/sounds/startup.wav Normal file

Binary file not shown.