Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 00c111da73 |
@@ -31,31 +31,20 @@ class AboutDialog(wx.Dialog):
|
|||||||
info_text = wx.StaticText(self, label="wxIRC Client")
|
info_text = wx.StaticText(self, label="wxIRC Client")
|
||||||
info_font = wx.Font(14, wx.FONTFAMILY_DEFAULT, wx.FONTSTYLE_NORMAL, wx.FONTWEIGHT_BOLD)
|
info_font = wx.Font(14, wx.FONTFAMILY_DEFAULT, wx.FONTSTYLE_NORMAL, wx.FONTWEIGHT_BOLD)
|
||||||
info_text.SetFont(info_font)
|
info_text.SetFont(info_font)
|
||||||
|
sizer.Add(info_text, 0, wx.ALL | wx.ALIGN_CENTER, 5)
|
||||||
|
|
||||||
import base64, hashlib
|
version_text = wx.StaticText(self, label="Version : 0.2.3") # COMMIT_HUNDRED:COMMIT_TEN:COMMIT_ONE e.g 23 commits -> 0.2.3
|
||||||
|
|
||||||
self.encoded = base64.b64encode(
|
|
||||||
hashlib.sha256(f"{self.GetId()}-{self.GetHandle()}".encode()).digest()
|
|
||||||
).decode('utf-8')
|
|
||||||
|
|
||||||
|
|
||||||
version_text = wx.StaticText(self, label=f"wxIRC V:e1")
|
|
||||||
version_font = wx.Font(10, wx.FONTFAMILY_DEFAULT, wx.FONTSTYLE_NORMAL, wx.FONTWEIGHT_NORMAL)
|
version_font = wx.Font(10, wx.FONTFAMILY_DEFAULT, wx.FONTSTYLE_NORMAL, wx.FONTWEIGHT_NORMAL)
|
||||||
version_text.SetFont(version_font)
|
version_text.SetFont(version_font)
|
||||||
|
|
||||||
|
|
||||||
rand_hash = wx.StaticText(self, label=f"{self.encoded}")
|
|
||||||
rand_hash.SetFont(version_font)
|
|
||||||
|
|
||||||
contrubutors_text = wx.StaticText(self, label="This software may not be used for commercial purposes. \n And may not be distributed for commercial purposes.")
|
|
||||||
contrubutors_font = wx.Font(10, wx.FONTFAMILY_DEFAULT, wx.FONTSTYLE_NORMAL, wx.FONTWEIGHT_NORMAL)
|
|
||||||
contrubutors_text.SetFont(contrubutors_font)
|
|
||||||
|
|
||||||
# Add info to sizer
|
|
||||||
sizer.Add(info_text, 0, wx.ALL | wx.ALIGN_CENTER, 5)
|
|
||||||
sizer.Add(version_text, 0, wx.ALL | wx.ALIGN_CENTER, 5)
|
sizer.Add(version_text, 0, wx.ALL | wx.ALIGN_CENTER, 5)
|
||||||
sizer.Add(rand_hash, 0, wx.ALL | wx.ALIGN_CENTER, 5)
|
|
||||||
sizer.Add(contrubutors_text, 0, wx.ALL | wx.ALIGN_CENTER, 5)
|
contributors_text = wx.StaticText(self, label="""
|
||||||
|
Contributors:
|
||||||
|
* rattatwinko (rattatwinko.servecounterstrike.com)
|
||||||
|
""")
|
||||||
|
contributors_font = wx.Font(10, wx.FONTFAMILY_DEFAULT, wx.FONTSTYLE_NORMAL, wx.FONTWEIGHT_NORMAL)
|
||||||
|
contributors_text.SetFont(contributors_font)
|
||||||
|
sizer.Add(contributors_text, 0, wx.ALL | wx.ALIGN_CENTER, 5)
|
||||||
|
|
||||||
# OK button
|
# OK button
|
||||||
ok_btn = wx.Button(self, wx.ID_OK, "OK")
|
ok_btn = wx.Button(self, wx.ID_OK, "OK")
|
||||||
|
|||||||
161
src/CommandAutocomplete.py
Normal file
161
src/CommandAutocomplete.py
Normal file
@@ -0,0 +1,161 @@
|
|||||||
|
import wx
|
||||||
|
import logging
|
||||||
|
|
||||||
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
class CommandAutocomplete(wx.PopupTransientWindow):
|
||||||
|
"""Popup window for IRC command autocomplete, similar to Minecraft."""
|
||||||
|
def __init__(self, parent, commands, on_select_callback):
|
||||||
|
super().__init__(parent, wx.BORDER_SIMPLE)
|
||||||
|
self.on_select_callback = on_select_callback
|
||||||
|
self.commands = commands # List of (command, description) tuples
|
||||||
|
self.filtered_commands = commands.copy()
|
||||||
|
self.selected_index = 0
|
||||||
|
|
||||||
|
self._init_ui()
|
||||||
|
self.Bind(wx.EVT_WINDOW_DESTROY, self.on_destroy)
|
||||||
|
|
||||||
|
def _init_ui(self):
|
||||||
|
panel = wx.Panel(self)
|
||||||
|
main_sizer = wx.BoxSizer(wx.VERTICAL)
|
||||||
|
|
||||||
|
# Command list
|
||||||
|
self.list_ctrl = wx.ListCtrl(panel, style=wx.LC_REPORT | wx.LC_SINGLE_SEL | wx.LC_NO_HEADER)
|
||||||
|
self.list_ctrl.InsertColumn(0, "Command", width=120)
|
||||||
|
self.list_ctrl.InsertColumn(1, "Description", width=280)
|
||||||
|
|
||||||
|
self._update_list()
|
||||||
|
|
||||||
|
# Bind events
|
||||||
|
self.list_ctrl.Bind(wx.EVT_LIST_ITEM_ACTIVATED, self.on_item_activated)
|
||||||
|
self.list_ctrl.Bind(wx.EVT_LIST_ITEM_SELECTED, self.on_item_selected)
|
||||||
|
self.Bind(wx.EVT_CHAR_HOOK, self.on_char_hook)
|
||||||
|
self.Bind(wx.EVT_KILL_FOCUS, self.on_kill_focus)
|
||||||
|
|
||||||
|
main_sizer.Add(self.list_ctrl, 1, wx.EXPAND | wx.ALL, 2)
|
||||||
|
panel.SetSizer(main_sizer)
|
||||||
|
main_sizer.Fit(panel)
|
||||||
|
self.SetClientSize(panel.GetBestSize())
|
||||||
|
|
||||||
|
# Select first item
|
||||||
|
if self.list_ctrl.GetItemCount() > 0:
|
||||||
|
self.list_ctrl.Select(0)
|
||||||
|
self.selected_index = 0
|
||||||
|
|
||||||
|
def _update_list(self):
|
||||||
|
"""Update the list with filtered commands."""
|
||||||
|
self.list_ctrl.DeleteAllItems()
|
||||||
|
for cmd, desc in self.filtered_commands:
|
||||||
|
idx = self.list_ctrl.InsertItem(self.list_ctrl.GetItemCount(), cmd)
|
||||||
|
self.list_ctrl.SetItem(idx, 1, desc)
|
||||||
|
|
||||||
|
# Resize to fit content (max 8 items visible)
|
||||||
|
item_height = 20
|
||||||
|
max_items = min(8, len(self.filtered_commands))
|
||||||
|
self.list_ctrl.SetSize((410, item_height * max_items + 4))
|
||||||
|
self.SetClientSize((410, item_height * max_items + 4))
|
||||||
|
|
||||||
|
def filter_commands(self, search_text):
|
||||||
|
"""Filter commands based on search text."""
|
||||||
|
search_lower = search_text.lower().strip()
|
||||||
|
if not search_lower:
|
||||||
|
self.filtered_commands = self.commands.copy()
|
||||||
|
else:
|
||||||
|
self.filtered_commands = [
|
||||||
|
(cmd, desc) for cmd, desc in self.commands
|
||||||
|
if cmd.lower().startswith(search_lower)
|
||||||
|
]
|
||||||
|
|
||||||
|
self.selected_index = 0
|
||||||
|
self._update_list()
|
||||||
|
if self.list_ctrl.GetItemCount() > 0:
|
||||||
|
self.list_ctrl.Select(0)
|
||||||
|
|
||||||
|
def on_item_activated(self, event):
|
||||||
|
"""Handle double-click or Enter on item."""
|
||||||
|
idx = event.GetIndex()
|
||||||
|
if 0 <= idx < len(self.filtered_commands):
|
||||||
|
cmd, _ = self.filtered_commands[idx]
|
||||||
|
if self.on_select_callback:
|
||||||
|
self.on_select_callback(cmd)
|
||||||
|
self.safe_dismiss()
|
||||||
|
|
||||||
|
def on_item_selected(self, event):
|
||||||
|
"""Handle item selection."""
|
||||||
|
self.selected_index = event.GetIndex()
|
||||||
|
|
||||||
|
def select_next(self):
|
||||||
|
"""Select next item."""
|
||||||
|
if self.list_ctrl.GetItemCount() > 0:
|
||||||
|
self.selected_index = (self.selected_index + 1) % self.list_ctrl.GetItemCount()
|
||||||
|
self.list_ctrl.Select(self.selected_index)
|
||||||
|
self.list_ctrl.EnsureVisible(self.selected_index)
|
||||||
|
|
||||||
|
def select_previous(self):
|
||||||
|
"""Select previous item."""
|
||||||
|
if self.list_ctrl.GetItemCount() > 0:
|
||||||
|
self.selected_index = (self.selected_index - 1) % self.list_ctrl.GetItemCount()
|
||||||
|
self.list_ctrl.Select(self.selected_index)
|
||||||
|
self.list_ctrl.EnsureVisible(self.selected_index)
|
||||||
|
|
||||||
|
def get_selected_command(self):
|
||||||
|
"""Get the currently selected command."""
|
||||||
|
if 0 <= self.selected_index < len(self.filtered_commands):
|
||||||
|
return self.filtered_commands[self.selected_index][0]
|
||||||
|
return None
|
||||||
|
|
||||||
|
def on_char_hook(self, event):
|
||||||
|
"""Handle keyboard events."""
|
||||||
|
keycode = event.GetKeyCode()
|
||||||
|
if keycode == wx.WXK_ESCAPE:
|
||||||
|
self.safe_dismiss()
|
||||||
|
elif keycode == wx.WXK_UP:
|
||||||
|
self.select_previous()
|
||||||
|
elif keycode == wx.WXK_DOWN:
|
||||||
|
self.select_next()
|
||||||
|
elif keycode == wx.WXK_RETURN or keycode == wx.WXK_NUMPAD_ENTER:
|
||||||
|
cmd = self.get_selected_command()
|
||||||
|
if cmd and self.on_select_callback:
|
||||||
|
self.on_select_callback(cmd)
|
||||||
|
self.safe_dismiss()
|
||||||
|
else:
|
||||||
|
event.Skip()
|
||||||
|
|
||||||
|
def on_kill_focus(self, event):
|
||||||
|
"""Handle focus loss."""
|
||||||
|
focused = wx.Window.FindFocus()
|
||||||
|
if focused and (focused == self.list_ctrl or self.IsDescendant(focused)):
|
||||||
|
event.Skip()
|
||||||
|
return
|
||||||
|
wx.CallLater(100, self.safe_dismiss)
|
||||||
|
event.Skip()
|
||||||
|
|
||||||
|
def on_destroy(self, event):
|
||||||
|
event.Skip()
|
||||||
|
|
||||||
|
def safe_dismiss(self):
|
||||||
|
"""Safely dismiss the popup."""
|
||||||
|
if not self.IsBeingDeleted() and self.IsShown():
|
||||||
|
try:
|
||||||
|
self.Dismiss()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error dismissing command autocomplete: {e}")
|
||||||
|
|
||||||
|
def show_at_input(self, input_ctrl):
|
||||||
|
"""Show popup near the input control."""
|
||||||
|
input_screen_pos = input_ctrl.ClientToScreen((0, 0))
|
||||||
|
popup_size = self.GetSize()
|
||||||
|
display_size = wx.GetDisplaySize()
|
||||||
|
|
||||||
|
# Show above the input, otherwise below
|
||||||
|
if input_screen_pos.y - popup_size.height > 0:
|
||||||
|
popup_y = input_screen_pos.y - popup_size.height - 2
|
||||||
|
else:
|
||||||
|
popup_y = input_screen_pos.y + input_ctrl.GetSize().height + 2
|
||||||
|
|
||||||
|
# Keep popup on screen horizontally
|
||||||
|
popup_x = max(10, min(input_screen_pos.x, display_size.x - popup_size.width - 10))
|
||||||
|
|
||||||
|
self.Position((popup_x, popup_y), (0, 0))
|
||||||
|
self.Popup()
|
||||||
355
src/IRCPanel.py
355
src/IRCPanel.py
@@ -5,358 +5,13 @@ import logging
|
|||||||
from SearchDialog import SearchDialog
|
from SearchDialog import SearchDialog
|
||||||
import traceback
|
import traceback
|
||||||
|
|
||||||
|
import KaomojiPicker as KaomojiPicker
|
||||||
|
import CommandAutocomplete as CommandAutocomplete
|
||||||
|
|
||||||
# Set up logging
|
# Set up logging
|
||||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class KaomojiPicker(wx.PopupTransientWindow):
|
|
||||||
def __init__(self, parent, kaomoji_groups, on_select_callback):
|
|
||||||
super().__init__(parent, wx.BORDER_SIMPLE)
|
|
||||||
self.on_select_callback = on_select_callback
|
|
||||||
self.kaomoji_groups = kaomoji_groups
|
|
||||||
|
|
||||||
self._init_ui()
|
|
||||||
self.Bind(wx.EVT_WINDOW_DESTROY, self.on_destroy)
|
|
||||||
|
|
||||||
def _init_ui(self):
|
|
||||||
panel = wx.Panel(self)
|
|
||||||
main_sizer = wx.BoxSizer(wx.VERTICAL)
|
|
||||||
|
|
||||||
# Scrolled content area
|
|
||||||
self.scroll = wx.ScrolledWindow(panel, size=(380, 420), style=wx.VSCROLL)
|
|
||||||
self.scroll.SetScrollRate(0, 15)
|
|
||||||
self.scroll_sizer = wx.BoxSizer(wx.VERTICAL)
|
|
||||||
|
|
||||||
# Storage for filtering
|
|
||||||
self.all_buttons = []
|
|
||||||
self.group_headers = {}
|
|
||||||
self.group_containers = {}
|
|
||||||
|
|
||||||
# Build kaomoji groups
|
|
||||||
self._build_kaomoji_groups()
|
|
||||||
|
|
||||||
self.scroll.SetSizer(self.scroll_sizer)
|
|
||||||
main_sizer.Add(self.scroll, 1, wx.EXPAND)
|
|
||||||
|
|
||||||
panel.SetSizer(main_sizer)
|
|
||||||
main_sizer.Fit(panel)
|
|
||||||
self.SetClientSize(panel.GetBestSize())
|
|
||||||
|
|
||||||
# Bind events
|
|
||||||
self._bind_events(panel)
|
|
||||||
|
|
||||||
def _build_kaomoji_groups(self):
|
|
||||||
dc = wx.ClientDC(self.scroll)
|
|
||||||
dc.SetFont(self.scroll.GetFont())
|
|
||||||
|
|
||||||
self.scroll_sizer.AddSpacer(8)
|
|
||||||
|
|
||||||
for group_name, kaomojis in self.kaomoji_groups.items():
|
|
||||||
if not kaomojis:
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Group header
|
|
||||||
header = self._create_group_header(group_name)
|
|
||||||
self.group_headers[group_name] = header
|
|
||||||
self.scroll_sizer.Add(header, 0, wx.LEFT | wx.TOP | wx.BOTTOM, 12)
|
|
||||||
|
|
||||||
# Wrap sizer for buttons
|
|
||||||
wrap_sizer = wx.WrapSizer(wx.HORIZONTAL)
|
|
||||||
|
|
||||||
for kaomoji in kaomojis:
|
|
||||||
btn = self._create_kaomoji_button(kaomoji, dc, group_name, wrap_sizer)
|
|
||||||
self.all_buttons.append(btn)
|
|
||||||
wrap_sizer.Add(btn, 0, wx.ALL, 3)
|
|
||||||
|
|
||||||
self.group_containers[group_name] = wrap_sizer
|
|
||||||
self.scroll_sizer.Add(wrap_sizer, 0, wx.LEFT | wx.RIGHT | wx.BOTTOM, 12)
|
|
||||||
|
|
||||||
def _create_group_header(self, group_name):
|
|
||||||
header = wx.StaticText(self.scroll, label=group_name)
|
|
||||||
font = header.GetFont()
|
|
||||||
font.SetWeight(wx.FONTWEIGHT_BOLD)
|
|
||||||
font.SetPointSize(9)
|
|
||||||
header.SetFont(font)
|
|
||||||
return header
|
|
||||||
|
|
||||||
def _create_kaomoji_button(self, kaomoji, dc, group_name, wrap_sizer):
|
|
||||||
text_width, text_height = dc.GetTextExtent(kaomoji)
|
|
||||||
btn_width = min(max(text_width + 20, 55), 140)
|
|
||||||
btn_height = text_height + 14
|
|
||||||
|
|
||||||
btn = wx.Button(self.scroll, label=kaomoji, size=(btn_width, btn_height))
|
|
||||||
btn.SetToolTip(f"{kaomoji} - {group_name}")
|
|
||||||
|
|
||||||
btn._kaomoji_text = kaomoji.lower()
|
|
||||||
btn._kaomoji_group = group_name.lower()
|
|
||||||
btn._group_name = group_name
|
|
||||||
|
|
||||||
# Bind click event
|
|
||||||
btn.Bind(wx.EVT_BUTTON, lambda evt, k=kaomoji: self.on_kaomoji_selected(k))
|
|
||||||
|
|
||||||
return btn
|
|
||||||
|
|
||||||
def _bind_events(self, panel):
|
|
||||||
panel.Bind(wx.EVT_CHAR_HOOK, self.on_char_hook)
|
|
||||||
self.Bind(wx.EVT_KILL_FOCUS, self.on_kill_focus)
|
|
||||||
|
|
||||||
def on_kaomoji_selected(self, kaomoji):
|
|
||||||
try:
|
|
||||||
if self.on_select_callback:
|
|
||||||
self.on_select_callback(kaomoji)
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error in kaomoji selection callback: {e}")
|
|
||||||
finally:
|
|
||||||
# Safely dismiss the popup
|
|
||||||
wx.CallAfter(self.safe_dismiss)
|
|
||||||
|
|
||||||
def on_search(self, event):
|
|
||||||
try:
|
|
||||||
search_text = self.search_ctrl.GetValue().lower().strip()
|
|
||||||
|
|
||||||
if not search_text:
|
|
||||||
self._show_all()
|
|
||||||
return
|
|
||||||
|
|
||||||
visible_groups = set()
|
|
||||||
|
|
||||||
for btn in self.all_buttons:
|
|
||||||
# Match kaomoji text or group name
|
|
||||||
is_match = (search_text in btn._kaomoji_text or
|
|
||||||
search_text in btn._kaomoji_group)
|
|
||||||
btn.Show(is_match)
|
|
||||||
|
|
||||||
if is_match:
|
|
||||||
visible_groups.add(btn._group_name)
|
|
||||||
|
|
||||||
# Show/hide group headers
|
|
||||||
for group_name, header in self.group_headers.items():
|
|
||||||
header.Show(group_name in visible_groups)
|
|
||||||
|
|
||||||
self.scroll.Layout()
|
|
||||||
self.scroll.FitInside()
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error in kaomoji search: {e}")
|
|
||||||
|
|
||||||
def on_clear_search(self, event):
|
|
||||||
self.search_ctrl.SetValue("")
|
|
||||||
self._show_all()
|
|
||||||
|
|
||||||
def _show_all(self):
|
|
||||||
for btn in self.all_buttons:
|
|
||||||
btn.Show()
|
|
||||||
for header in self.group_headers.values():
|
|
||||||
header.Show()
|
|
||||||
self.scroll.Layout()
|
|
||||||
self.scroll.FitInside()
|
|
||||||
|
|
||||||
def on_char_hook(self, event):
|
|
||||||
if event.GetKeyCode() == wx.WXK_ESCAPE:
|
|
||||||
self.safe_dismiss()
|
|
||||||
else:
|
|
||||||
event.Skip()
|
|
||||||
|
|
||||||
def on_kill_focus(self, event):
|
|
||||||
focused = wx.Window.FindFocus()
|
|
||||||
if focused and (focused == self.search_ctrl or
|
|
||||||
focused.GetParent() == self.scroll or
|
|
||||||
self.scroll.IsDescendant(focused)):
|
|
||||||
event.Skip()
|
|
||||||
return
|
|
||||||
|
|
||||||
wx.CallLater(100, self.safe_dismiss)
|
|
||||||
event.Skip()
|
|
||||||
|
|
||||||
def on_destroy(self, event):
|
|
||||||
self.all_buttons.clear()
|
|
||||||
self.group_headers.clear()
|
|
||||||
self.group_containers.clear()
|
|
||||||
event.Skip()
|
|
||||||
|
|
||||||
def safe_dismiss(self):
|
|
||||||
if not self.IsBeingDeleted() and self.IsShown():
|
|
||||||
try:
|
|
||||||
self.Dismiss()
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error dismissing popup: {e}")
|
|
||||||
|
|
||||||
def show_at_button(self, button):
|
|
||||||
btn_screen_pos = button.ClientToScreen((0, 0))
|
|
||||||
popup_size = self.GetSize()
|
|
||||||
display_size = wx.GetDisplaySize()
|
|
||||||
|
|
||||||
# Try to show above the button, otherwise below
|
|
||||||
if btn_screen_pos.y - popup_size.height > 0:
|
|
||||||
popup_y = btn_screen_pos.y - popup_size.height
|
|
||||||
else:
|
|
||||||
popup_y = btn_screen_pos.y + button.GetSize().height
|
|
||||||
|
|
||||||
# Keep popup on screen horizontally
|
|
||||||
popup_x = max(10, min(btn_screen_pos.x, display_size.x - popup_size.width - 10))
|
|
||||||
|
|
||||||
self.Position((popup_x, popup_y), (0, 0))
|
|
||||||
self.Popup()
|
|
||||||
|
|
||||||
|
|
||||||
class CommandAutocomplete(wx.PopupTransientWindow):
|
|
||||||
"""Popup window for IRC command autocomplete, similar to Minecraft."""
|
|
||||||
def __init__(self, parent, commands, on_select_callback):
|
|
||||||
super().__init__(parent, wx.BORDER_SIMPLE)
|
|
||||||
self.on_select_callback = on_select_callback
|
|
||||||
self.commands = commands # List of (command, description) tuples
|
|
||||||
self.filtered_commands = commands.copy()
|
|
||||||
self.selected_index = 0
|
|
||||||
|
|
||||||
self._init_ui()
|
|
||||||
self.Bind(wx.EVT_WINDOW_DESTROY, self.on_destroy)
|
|
||||||
|
|
||||||
def _init_ui(self):
|
|
||||||
panel = wx.Panel(self)
|
|
||||||
main_sizer = wx.BoxSizer(wx.VERTICAL)
|
|
||||||
|
|
||||||
# Command list
|
|
||||||
self.list_ctrl = wx.ListCtrl(panel, style=wx.LC_REPORT | wx.LC_SINGLE_SEL | wx.LC_NO_HEADER)
|
|
||||||
self.list_ctrl.InsertColumn(0, "Command", width=120)
|
|
||||||
self.list_ctrl.InsertColumn(1, "Description", width=280)
|
|
||||||
|
|
||||||
self._update_list()
|
|
||||||
|
|
||||||
# Bind events
|
|
||||||
self.list_ctrl.Bind(wx.EVT_LIST_ITEM_ACTIVATED, self.on_item_activated)
|
|
||||||
self.list_ctrl.Bind(wx.EVT_LIST_ITEM_SELECTED, self.on_item_selected)
|
|
||||||
self.Bind(wx.EVT_CHAR_HOOK, self.on_char_hook)
|
|
||||||
self.Bind(wx.EVT_KILL_FOCUS, self.on_kill_focus)
|
|
||||||
|
|
||||||
main_sizer.Add(self.list_ctrl, 1, wx.EXPAND | wx.ALL, 2)
|
|
||||||
panel.SetSizer(main_sizer)
|
|
||||||
main_sizer.Fit(panel)
|
|
||||||
self.SetClientSize(panel.GetBestSize())
|
|
||||||
|
|
||||||
# Select first item
|
|
||||||
if self.list_ctrl.GetItemCount() > 0:
|
|
||||||
self.list_ctrl.Select(0)
|
|
||||||
self.selected_index = 0
|
|
||||||
|
|
||||||
def _update_list(self):
|
|
||||||
"""Update the list with filtered commands."""
|
|
||||||
self.list_ctrl.DeleteAllItems()
|
|
||||||
for cmd, desc in self.filtered_commands:
|
|
||||||
idx = self.list_ctrl.InsertItem(self.list_ctrl.GetItemCount(), cmd)
|
|
||||||
self.list_ctrl.SetItem(idx, 1, desc)
|
|
||||||
|
|
||||||
# Resize to fit content (max 8 items visible)
|
|
||||||
item_height = 20
|
|
||||||
max_items = min(8, len(self.filtered_commands))
|
|
||||||
self.list_ctrl.SetSize((410, item_height * max_items + 4))
|
|
||||||
self.SetClientSize((410, item_height * max_items + 4))
|
|
||||||
|
|
||||||
def filter_commands(self, search_text):
|
|
||||||
"""Filter commands based on search text."""
|
|
||||||
search_lower = search_text.lower().strip()
|
|
||||||
if not search_lower:
|
|
||||||
self.filtered_commands = self.commands.copy()
|
|
||||||
else:
|
|
||||||
self.filtered_commands = [
|
|
||||||
(cmd, desc) for cmd, desc in self.commands
|
|
||||||
if cmd.lower().startswith(search_lower)
|
|
||||||
]
|
|
||||||
|
|
||||||
self.selected_index = 0
|
|
||||||
self._update_list()
|
|
||||||
if self.list_ctrl.GetItemCount() > 0:
|
|
||||||
self.list_ctrl.Select(0)
|
|
||||||
|
|
||||||
def on_item_activated(self, event):
|
|
||||||
"""Handle double-click or Enter on item."""
|
|
||||||
idx = event.GetIndex()
|
|
||||||
if 0 <= idx < len(self.filtered_commands):
|
|
||||||
cmd, _ = self.filtered_commands[idx]
|
|
||||||
if self.on_select_callback:
|
|
||||||
self.on_select_callback(cmd)
|
|
||||||
self.safe_dismiss()
|
|
||||||
|
|
||||||
def on_item_selected(self, event):
|
|
||||||
"""Handle item selection."""
|
|
||||||
self.selected_index = event.GetIndex()
|
|
||||||
|
|
||||||
def select_next(self):
|
|
||||||
"""Select next item."""
|
|
||||||
if self.list_ctrl.GetItemCount() > 0:
|
|
||||||
self.selected_index = (self.selected_index + 1) % self.list_ctrl.GetItemCount()
|
|
||||||
self.list_ctrl.Select(self.selected_index)
|
|
||||||
self.list_ctrl.EnsureVisible(self.selected_index)
|
|
||||||
|
|
||||||
def select_previous(self):
|
|
||||||
"""Select previous item."""
|
|
||||||
if self.list_ctrl.GetItemCount() > 0:
|
|
||||||
self.selected_index = (self.selected_index - 1) % self.list_ctrl.GetItemCount()
|
|
||||||
self.list_ctrl.Select(self.selected_index)
|
|
||||||
self.list_ctrl.EnsureVisible(self.selected_index)
|
|
||||||
|
|
||||||
def get_selected_command(self):
|
|
||||||
"""Get the currently selected command."""
|
|
||||||
if 0 <= self.selected_index < len(self.filtered_commands):
|
|
||||||
return self.filtered_commands[self.selected_index][0]
|
|
||||||
return None
|
|
||||||
|
|
||||||
def on_char_hook(self, event):
|
|
||||||
"""Handle keyboard events."""
|
|
||||||
keycode = event.GetKeyCode()
|
|
||||||
if keycode == wx.WXK_ESCAPE:
|
|
||||||
self.safe_dismiss()
|
|
||||||
elif keycode == wx.WXK_UP:
|
|
||||||
self.select_previous()
|
|
||||||
elif keycode == wx.WXK_DOWN:
|
|
||||||
self.select_next()
|
|
||||||
elif keycode == wx.WXK_RETURN or keycode == wx.WXK_NUMPAD_ENTER:
|
|
||||||
cmd = self.get_selected_command()
|
|
||||||
if cmd and self.on_select_callback:
|
|
||||||
self.on_select_callback(cmd)
|
|
||||||
self.safe_dismiss()
|
|
||||||
else:
|
|
||||||
event.Skip()
|
|
||||||
|
|
||||||
def on_kill_focus(self, event):
|
|
||||||
"""Handle focus loss."""
|
|
||||||
focused = wx.Window.FindFocus()
|
|
||||||
if focused and (focused == self.list_ctrl or self.IsDescendant(focused)):
|
|
||||||
event.Skip()
|
|
||||||
return
|
|
||||||
wx.CallLater(100, self.safe_dismiss)
|
|
||||||
event.Skip()
|
|
||||||
|
|
||||||
def on_destroy(self, event):
|
|
||||||
event.Skip()
|
|
||||||
|
|
||||||
def safe_dismiss(self):
|
|
||||||
"""Safely dismiss the popup."""
|
|
||||||
if not self.IsBeingDeleted() and self.IsShown():
|
|
||||||
try:
|
|
||||||
self.Dismiss()
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error dismissing command autocomplete: {e}")
|
|
||||||
|
|
||||||
def show_at_input(self, input_ctrl):
|
|
||||||
"""Show popup near the input control."""
|
|
||||||
input_screen_pos = input_ctrl.ClientToScreen((0, 0))
|
|
||||||
popup_size = self.GetSize()
|
|
||||||
display_size = wx.GetDisplaySize()
|
|
||||||
|
|
||||||
# Show above the input, otherwise below
|
|
||||||
if input_screen_pos.y - popup_size.height > 0:
|
|
||||||
popup_y = input_screen_pos.y - popup_size.height - 2
|
|
||||||
else:
|
|
||||||
popup_y = input_screen_pos.y + input_ctrl.GetSize().height + 2
|
|
||||||
|
|
||||||
# Keep popup on screen horizontally
|
|
||||||
popup_x = max(10, min(input_screen_pos.x, display_size.x - popup_size.width - 10))
|
|
||||||
|
|
||||||
self.Position((popup_x, popup_y), (0, 0))
|
|
||||||
self.Popup()
|
|
||||||
|
|
||||||
|
|
||||||
class IRCPanel(wx.Panel):
|
class IRCPanel(wx.Panel):
|
||||||
# IRC commands with descriptions
|
# IRC commands with descriptions
|
||||||
IRC_COMMANDS = [
|
IRC_COMMANDS = [
|
||||||
@@ -673,7 +328,7 @@ class IRCPanel(wx.Panel):
|
|||||||
self.current_popup.Dismiss()
|
self.current_popup.Dismiss()
|
||||||
|
|
||||||
# Create new popup
|
# Create new popup
|
||||||
self.current_popup = KaomojiPicker(
|
self.current_popup = KaomojiPicker.KaomojiPicker(
|
||||||
self,
|
self,
|
||||||
self.KAOMOJI_GROUPS,
|
self.KAOMOJI_GROUPS,
|
||||||
self.on_kaomoji_insert
|
self.on_kaomoji_insert
|
||||||
@@ -746,7 +401,7 @@ class IRCPanel(wx.Panel):
|
|||||||
try:
|
try:
|
||||||
if not self.command_popup or self.command_popup.IsBeingDeleted():
|
if not self.command_popup or self.command_popup.IsBeingDeleted():
|
||||||
# Create new popup
|
# Create new popup
|
||||||
self.command_popup = CommandAutocomplete(
|
self.command_popup = CommandAutocomplete.CommandAutocomplete( # fuckass python
|
||||||
self,
|
self,
|
||||||
self.IRC_COMMANDS,
|
self.IRC_COMMANDS,
|
||||||
self.on_command_select
|
self.on_command_select
|
||||||
|
|||||||
103
src/InterfaceSelectDialog.py
Normal file
103
src/InterfaceSelectDialog.py
Normal file
@@ -0,0 +1,103 @@
|
|||||||
|
import wx
|
||||||
|
import socket
|
||||||
|
import logging
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
class InterfaceSelectDialog(wx.Dialog):
|
||||||
|
"""Dialog that lets the user pick which local interface the server should bind to."""
|
||||||
|
|
||||||
|
def __init__(self, parent, current_host="127.0.0.1"):
|
||||||
|
super().__init__(parent, title="Select Network Interface", size=(420, 380))
|
||||||
|
try:
|
||||||
|
self.SetIcon(parent.GetIcon())
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
self.selected_host = current_host
|
||||||
|
panel = wx.Panel(self)
|
||||||
|
panel.SetBackgroundColour(parent.theme["window_bg"])
|
||||||
|
main_sizer = wx.BoxSizer(wx.VERTICAL)
|
||||||
|
|
||||||
|
info = wx.StaticText(
|
||||||
|
panel,
|
||||||
|
label="Choose the Network interface where your server should run:\n"
|
||||||
|
"You are EXPOSING a Server to your LOCAL Network, this may give away who you are!\n",
|
||||||
|
)
|
||||||
|
info.Wrap(380)
|
||||||
|
main_sizer.Add(info, 0, wx.ALL | wx.EXPAND, 10)
|
||||||
|
|
||||||
|
self.interface_list = wx.ListCtrl(panel, style=wx.LC_REPORT | wx.BORDER_SUNKEN)
|
||||||
|
self.interface_list.InsertColumn(0, "Interface", width=180)
|
||||||
|
self.interface_list.InsertColumn(1, "Address", width=180)
|
||||||
|
|
||||||
|
self.interfaces = self._gather_interfaces()
|
||||||
|
current_index = 0
|
||||||
|
for idx, entry in enumerate[tuple[str, str]](self.interfaces):
|
||||||
|
name, address = entry
|
||||||
|
self.interface_list.InsertItem(idx, name)
|
||||||
|
self.interface_list.SetItem(idx, 1, address)
|
||||||
|
if address == current_host:
|
||||||
|
current_index = idx
|
||||||
|
self.interface_list.Select(current_index)
|
||||||
|
self.interface_list.EnsureVisible(current_index)
|
||||||
|
if self.interfaces:
|
||||||
|
self.selected_host = self.interfaces[current_index][1]
|
||||||
|
self.interface_list.Bind(wx.EVT_LIST_ITEM_SELECTED, self.on_select)
|
||||||
|
self.interface_list.Bind(wx.EVT_LIST_ITEM_ACTIVATED, self.on_activate)
|
||||||
|
main_sizer.Add(self.interface_list, 1, wx.ALL | wx.EXPAND, 10)
|
||||||
|
|
||||||
|
button_bar = wx.StdDialogButtonSizer()
|
||||||
|
ok_btn = wx.Button(panel, wx.ID_OK)
|
||||||
|
cancel_btn = wx.Button(panel, wx.ID_CANCEL)
|
||||||
|
button_bar.AddButton(ok_btn)
|
||||||
|
button_bar.AddButton(cancel_btn)
|
||||||
|
button_bar.Realize()
|
||||||
|
main_sizer.Add(button_bar, 0, wx.ALL | wx.EXPAND, 10)
|
||||||
|
|
||||||
|
ok_btn.Bind(wx.EVT_BUTTON, self.on_ok)
|
||||||
|
|
||||||
|
panel.SetSizer(main_sizer)
|
||||||
|
|
||||||
|
def on_select(self, event):
|
||||||
|
index = event.GetIndex()
|
||||||
|
_, address = self.interfaces[index]
|
||||||
|
self.selected_host = address
|
||||||
|
|
||||||
|
def _gather_interfaces(self):
|
||||||
|
entries = [
|
||||||
|
("Loopback only", "127.0.0.1"),
|
||||||
|
("All interfaces", "0.0.0.0"),
|
||||||
|
]
|
||||||
|
try:
|
||||||
|
import psutil
|
||||||
|
|
||||||
|
seen = {addr for _, addr in entries}
|
||||||
|
for name, addrs in psutil.net_if_addrs().items():
|
||||||
|
for addr in addrs:
|
||||||
|
if addr.family == socket.AF_INET and addr.address not in seen:
|
||||||
|
label = f"{name}"
|
||||||
|
entries.append((label, addr.address))
|
||||||
|
seen.add(addr.address)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Unable to enumerate network interfaces: {e}")
|
||||||
|
|
||||||
|
return entries
|
||||||
|
|
||||||
|
def get_selected_host(self):
|
||||||
|
return self.selected_host
|
||||||
|
|
||||||
|
def on_activate(self, event):
|
||||||
|
self.on_select(event)
|
||||||
|
self.EndModal(wx.ID_OK)
|
||||||
|
|
||||||
|
def on_ok(self, event):
|
||||||
|
index = self.interface_list.GetFirstSelected()
|
||||||
|
if index == -1 and self.interfaces:
|
||||||
|
wx.MessageBox("Select an interface before starting the server.", "No Interface Selected", wx.OK | wx.ICON_INFORMATION)
|
||||||
|
return
|
||||||
|
if index != -1:
|
||||||
|
_, address = self.interfaces[index]
|
||||||
|
self.selected_host = address
|
||||||
|
event.Skip()
|
||||||
|
|
||||||
197
src/KaomojiPicker.py
Normal file
197
src/KaomojiPicker.py
Normal file
@@ -0,0 +1,197 @@
|
|||||||
|
import logging
|
||||||
|
import traceback
|
||||||
|
import wx
|
||||||
|
import wx.adv
|
||||||
|
|
||||||
|
# Set up logging
|
||||||
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
class KaomojiPicker(wx.PopupTransientWindow):
|
||||||
|
def __init__(self, parent, kaomoji_groups, on_select_callback):
|
||||||
|
super().__init__(parent, wx.BORDER_SIMPLE)
|
||||||
|
self.on_select_callback = on_select_callback
|
||||||
|
self.kaomoji_groups = kaomoji_groups
|
||||||
|
|
||||||
|
self._init_ui()
|
||||||
|
self.Bind(wx.EVT_WINDOW_DESTROY, self.on_destroy)
|
||||||
|
|
||||||
|
def _init_ui(self):
|
||||||
|
panel = wx.Panel(self)
|
||||||
|
main_sizer = wx.BoxSizer(wx.VERTICAL)
|
||||||
|
|
||||||
|
# Scrolled content area
|
||||||
|
self.scroll = wx.ScrolledWindow(panel, size=(380, 420), style=wx.VSCROLL)
|
||||||
|
self.scroll.SetScrollRate(0, 15)
|
||||||
|
self.scroll_sizer = wx.BoxSizer(wx.VERTICAL)
|
||||||
|
|
||||||
|
# Storage for filtering
|
||||||
|
self.all_buttons = []
|
||||||
|
self.group_headers = {}
|
||||||
|
self.group_containers = {}
|
||||||
|
|
||||||
|
# Build kaomoji groups
|
||||||
|
self._build_kaomoji_groups()
|
||||||
|
|
||||||
|
self.scroll.SetSizer(self.scroll_sizer)
|
||||||
|
main_sizer.Add(self.scroll, 1, wx.EXPAND)
|
||||||
|
|
||||||
|
panel.SetSizer(main_sizer)
|
||||||
|
main_sizer.Fit(panel)
|
||||||
|
self.SetClientSize(panel.GetBestSize())
|
||||||
|
|
||||||
|
# Bind events
|
||||||
|
self._bind_events(panel)
|
||||||
|
|
||||||
|
def _build_kaomoji_groups(self):
|
||||||
|
dc = wx.ClientDC(self.scroll)
|
||||||
|
dc.SetFont(self.scroll.GetFont())
|
||||||
|
|
||||||
|
self.scroll_sizer.AddSpacer(8)
|
||||||
|
|
||||||
|
for group_name, kaomojis in self.kaomoji_groups.items():
|
||||||
|
if not kaomojis:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Group header
|
||||||
|
header = self._create_group_header(group_name)
|
||||||
|
self.group_headers[group_name] = header
|
||||||
|
self.scroll_sizer.Add(header, 0, wx.LEFT | wx.TOP | wx.BOTTOM, 12)
|
||||||
|
|
||||||
|
# Wrap sizer for buttons
|
||||||
|
wrap_sizer = wx.WrapSizer(wx.HORIZONTAL)
|
||||||
|
|
||||||
|
for kaomoji in kaomojis:
|
||||||
|
btn = self._create_kaomoji_button(kaomoji, dc, group_name, wrap_sizer)
|
||||||
|
self.all_buttons.append(btn)
|
||||||
|
wrap_sizer.Add(btn, 0, wx.ALL, 3)
|
||||||
|
|
||||||
|
self.group_containers[group_name] = wrap_sizer
|
||||||
|
self.scroll_sizer.Add(wrap_sizer, 0, wx.LEFT | wx.RIGHT | wx.BOTTOM, 12)
|
||||||
|
|
||||||
|
def _create_group_header(self, group_name):
|
||||||
|
header = wx.StaticText(self.scroll, label=group_name)
|
||||||
|
font = header.GetFont()
|
||||||
|
font.SetWeight(wx.FONTWEIGHT_BOLD)
|
||||||
|
font.SetPointSize(9)
|
||||||
|
header.SetFont(font)
|
||||||
|
return header
|
||||||
|
|
||||||
|
def _create_kaomoji_button(self, kaomoji, dc, group_name, wrap_sizer):
|
||||||
|
text_width, text_height = dc.GetTextExtent(kaomoji)
|
||||||
|
btn_width = min(max(text_width + 20, 55), 140)
|
||||||
|
btn_height = text_height + 14
|
||||||
|
|
||||||
|
btn = wx.Button(self.scroll, label=kaomoji, size=(btn_width, btn_height))
|
||||||
|
btn.SetToolTip(f"{kaomoji} - {group_name}")
|
||||||
|
|
||||||
|
btn._kaomoji_text = kaomoji.lower()
|
||||||
|
btn._kaomoji_group = group_name.lower()
|
||||||
|
btn._group_name = group_name
|
||||||
|
|
||||||
|
# Bind click event
|
||||||
|
btn.Bind(wx.EVT_BUTTON, lambda evt, k=kaomoji: self.on_kaomoji_selected(k))
|
||||||
|
|
||||||
|
return btn
|
||||||
|
|
||||||
|
def _bind_events(self, panel):
|
||||||
|
panel.Bind(wx.EVT_CHAR_HOOK, self.on_char_hook)
|
||||||
|
self.Bind(wx.EVT_KILL_FOCUS, self.on_kill_focus)
|
||||||
|
|
||||||
|
def on_kaomoji_selected(self, kaomoji):
|
||||||
|
try:
|
||||||
|
if self.on_select_callback:
|
||||||
|
self.on_select_callback(kaomoji)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error in kaomoji selection callback: {e}")
|
||||||
|
finally:
|
||||||
|
# Safely dismiss the popup
|
||||||
|
wx.CallAfter(self.safe_dismiss)
|
||||||
|
|
||||||
|
def on_search(self, event):
|
||||||
|
try:
|
||||||
|
search_text = self.search_ctrl.GetValue().lower().strip()
|
||||||
|
|
||||||
|
if not search_text:
|
||||||
|
self._show_all()
|
||||||
|
return
|
||||||
|
|
||||||
|
visible_groups = set()
|
||||||
|
|
||||||
|
for btn in self.all_buttons:
|
||||||
|
# Match kaomoji text or group name
|
||||||
|
is_match = (search_text in btn._kaomoji_text or
|
||||||
|
search_text in btn._kaomoji_group)
|
||||||
|
btn.Show(is_match)
|
||||||
|
|
||||||
|
if is_match:
|
||||||
|
visible_groups.add(btn._group_name)
|
||||||
|
|
||||||
|
# Show/hide group headers
|
||||||
|
for group_name, header in self.group_headers.items():
|
||||||
|
header.Show(group_name in visible_groups)
|
||||||
|
|
||||||
|
self.scroll.Layout()
|
||||||
|
self.scroll.FitInside()
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error in kaomoji search: {e}")
|
||||||
|
|
||||||
|
def on_clear_search(self, event):
|
||||||
|
self.search_ctrl.SetValue("")
|
||||||
|
self._show_all()
|
||||||
|
|
||||||
|
def _show_all(self):
|
||||||
|
for btn in self.all_buttons:
|
||||||
|
btn.Show()
|
||||||
|
for header in self.group_headers.values():
|
||||||
|
header.Show()
|
||||||
|
self.scroll.Layout()
|
||||||
|
self.scroll.FitInside()
|
||||||
|
|
||||||
|
def on_char_hook(self, event):
|
||||||
|
if event.GetKeyCode() == wx.WXK_ESCAPE:
|
||||||
|
self.safe_dismiss()
|
||||||
|
else:
|
||||||
|
event.Skip()
|
||||||
|
|
||||||
|
def on_kill_focus(self, event):
|
||||||
|
focused = wx.Window.FindFocus()
|
||||||
|
if focused and (focused == self.search_ctrl or
|
||||||
|
focused.GetParent() == self.scroll or
|
||||||
|
self.scroll.IsDescendant(focused)):
|
||||||
|
event.Skip()
|
||||||
|
return
|
||||||
|
|
||||||
|
wx.CallLater(100, self.safe_dismiss)
|
||||||
|
event.Skip()
|
||||||
|
|
||||||
|
def on_destroy(self, event):
|
||||||
|
self.all_buttons.clear()
|
||||||
|
self.group_headers.clear()
|
||||||
|
self.group_containers.clear()
|
||||||
|
event.Skip()
|
||||||
|
|
||||||
|
def safe_dismiss(self):
|
||||||
|
if not self.IsBeingDeleted() and self.IsShown():
|
||||||
|
try:
|
||||||
|
self.Dismiss()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error dismissing popup: {e}")
|
||||||
|
|
||||||
|
def show_at_button(self, button):
|
||||||
|
btn_screen_pos = button.ClientToScreen((0, 0))
|
||||||
|
popup_size = self.GetSize()
|
||||||
|
display_size = wx.GetDisplaySize()
|
||||||
|
|
||||||
|
# Try to show above the button, otherwise below
|
||||||
|
if btn_screen_pos.y - popup_size.height > 0:
|
||||||
|
popup_y = btn_screen_pos.y - popup_size.height
|
||||||
|
else:
|
||||||
|
popup_y = btn_screen_pos.y + button.GetSize().height
|
||||||
|
|
||||||
|
# Keep popup on screen horizontally
|
||||||
|
popup_x = max(10, min(btn_screen_pos.x, display_size.x - popup_size.width - 10))
|
||||||
|
|
||||||
|
self.Position((popup_x, popup_y), (0, 0))
|
||||||
|
self.Popup()
|
||||||
@@ -1,18 +1,11 @@
|
|||||||
import ipaddress
|
import ipaddress
|
||||||
import logging
|
import logging
|
||||||
import socket
|
|
||||||
import threading
|
|
||||||
from typing import Callable, Iterable, List, Optional
|
from typing import Callable, Iterable, List, Optional
|
||||||
|
|
||||||
from irc import server as irc_server
|
from irc import server as irc_server
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def _default_log_callback(message: str, color=None, bold: bool = False):
|
|
||||||
"""Fallback logger when UI callback is not available."""
|
|
||||||
logger.info(message)
|
|
||||||
|
|
||||||
|
|
||||||
class LocalOnlyIRCServer(irc_server.IRCServer):
|
class LocalOnlyIRCServer(irc_server.IRCServer):
|
||||||
"""IRC server that only accepts connections from local/LAN addresses."""
|
"""IRC server that only accepts connections from local/LAN addresses."""
|
||||||
@@ -44,172 +37,6 @@ class LocalOnlyIRCServer(irc_server.IRCServer):
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
class LocalServerManager:
|
|
||||||
"""Manages the background IRC server lifecycle."""
|
|
||||||
|
|
||||||
DEFAULT_CHANNELS = ["#lobby"]
|
|
||||||
DEFAULT_ALLOWED_NETWORKS = [
|
|
||||||
ipaddress.ip_network("127.0.0.0/8"), # Loopback
|
|
||||||
ipaddress.ip_network("10.0.0.0/8"), # RFC1918
|
|
||||||
ipaddress.ip_network("172.16.0.0/12"), # RFC1918
|
|
||||||
ipaddress.ip_network("192.168.0.0/16"), # RFC1918
|
|
||||||
ipaddress.ip_network("169.254.0.0/16"), # Link-local
|
|
||||||
]
|
|
||||||
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
log_callback: Callable[[str, Optional[object], bool], None] = _default_log_callback,
|
|
||||||
listen_host: str = "0.0.0.0",
|
|
||||||
listen_port: int = 6667,
|
|
||||||
):
|
|
||||||
self.log_callback = log_callback or _default_log_callback
|
|
||||||
self.listen_host = listen_host
|
|
||||||
self.listen_port = listen_port
|
|
||||||
self.allowed_networks = list(self.DEFAULT_ALLOWED_NETWORKS)
|
|
||||||
self._channels = list(self.DEFAULT_CHANNELS)
|
|
||||||
|
|
||||||
self._server: Optional[LocalOnlyIRCServer] = None
|
|
||||||
self._thread: Optional[threading.Thread] = None
|
|
||||||
self._lock = threading.RLock()
|
|
||||||
self._running = threading.Event()
|
|
||||||
self._ready = threading.Event()
|
|
||||||
self._error: Optional[Exception] = None
|
|
||||||
|
|
||||||
# Public API ---------------------------------------------------------
|
|
||||||
def start(self, timeout: float = 5.0):
|
|
||||||
"""Start the background IRC server."""
|
|
||||||
with self._lock:
|
|
||||||
if self._running.is_set():
|
|
||||||
raise RuntimeError("Local IRC server is already running.")
|
|
||||||
|
|
||||||
self._running.set()
|
|
||||||
self._ready.clear()
|
|
||||||
self._error = None
|
|
||||||
|
|
||||||
self._thread = threading.Thread(
|
|
||||||
target=self._serve_forever,
|
|
||||||
name="Local-IRC-Server",
|
|
||||||
daemon=True,
|
|
||||||
)
|
|
||||||
self._thread.start()
|
|
||||||
|
|
||||||
if not self._ready.wait(timeout):
|
|
||||||
self._running.clear()
|
|
||||||
raise TimeoutError("Local IRC server failed to start in time.")
|
|
||||||
|
|
||||||
if self._error:
|
|
||||||
raise self._error
|
|
||||||
|
|
||||||
def stop(self, timeout: float = 5.0):
|
|
||||||
"""Stop the IRC server if it is running."""
|
|
||||||
with self._lock:
|
|
||||||
if not self._running.is_set():
|
|
||||||
return
|
|
||||||
|
|
||||||
server = self._server
|
|
||||||
thread = self._thread
|
|
||||||
|
|
||||||
if server:
|
|
||||||
server.shutdown()
|
|
||||||
server.server_close()
|
|
||||||
|
|
||||||
if thread:
|
|
||||||
thread.join(timeout=timeout)
|
|
||||||
|
|
||||||
with self._lock:
|
|
||||||
self._server = None
|
|
||||||
self._thread = None
|
|
||||||
self._running.clear()
|
|
||||||
self._ready.clear()
|
|
||||||
|
|
||||||
def is_running(self) -> bool:
|
|
||||||
return self._running.is_set()
|
|
||||||
|
|
||||||
def set_listen_host(self, host: str):
|
|
||||||
"""Update the bind address. Local server must be stopped."""
|
|
||||||
with self._lock:
|
|
||||||
if self._running.is_set():
|
|
||||||
raise RuntimeError("Stop the server before changing the interface.")
|
|
||||||
self.listen_host = host
|
|
||||||
self._log(f"Local server interface set to {host}.")
|
|
||||||
|
|
||||||
def get_channels(self) -> List[str]:
|
|
||||||
with self._lock:
|
|
||||||
return list(self._channels)
|
|
||||||
|
|
||||||
def set_channels(self, channels: Iterable[str]):
|
|
||||||
cleaned = self._sanitize_channels(channels)
|
|
||||||
with self._lock:
|
|
||||||
self._channels = cleaned or list(self.DEFAULT_CHANNELS)
|
|
||||||
|
|
||||||
if self.is_running():
|
|
||||||
self._log(
|
|
||||||
"Channel list updated. Restart local server to apply changes.",
|
|
||||||
bold=True,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Internal helpers ---------------------------------------------------
|
|
||||||
def _serve_forever(self):
|
|
||||||
try:
|
|
||||||
server = LocalOnlyIRCServer(
|
|
||||||
(self.listen_host, self.listen_port),
|
|
||||||
irc_server.IRCClient,
|
|
||||||
self.allowed_networks,
|
|
||||||
blocked_callback=lambda ip: self._log(
|
|
||||||
f"Blocked connection attempt from {ip}", bold=False
|
|
||||||
),
|
|
||||||
)
|
|
||||||
server.servername = socket.gethostname() or "wxirc-local"
|
|
||||||
|
|
||||||
with self._lock:
|
|
||||||
self._server = server
|
|
||||||
seed_channels = self._channels or list(self.DEFAULT_CHANNELS)
|
|
||||||
for channel in seed_channels:
|
|
||||||
server.channels.setdefault(channel, irc_server.IRCChannel(channel))
|
|
||||||
|
|
||||||
self._log(
|
|
||||||
f"Local IRC server listening on {self.listen_host}:{self.listen_port}",
|
|
||||||
bold=True,
|
|
||||||
)
|
|
||||||
self._ready.set()
|
|
||||||
server.serve_forever()
|
|
||||||
except Exception as exc:
|
|
||||||
logger.exception("Local IRC server failed: %s", exc)
|
|
||||||
self._error = exc
|
|
||||||
self._ready.set()
|
|
||||||
self._log(f"Local server error: {exc}", bold=True)
|
|
||||||
finally:
|
|
||||||
self._running.clear()
|
|
||||||
self._log("Local IRC server stopped.")
|
|
||||||
|
|
||||||
def _sanitize_channels(self, channels: Iterable[str]) -> List[str]:
|
|
||||||
unique = []
|
|
||||||
seen = set()
|
|
||||||
for channel in channels:
|
|
||||||
if not channel:
|
|
||||||
continue
|
|
||||||
name = channel.strip()
|
|
||||||
if not name.startswith("#"):
|
|
||||||
name = f"#{name}"
|
|
||||||
if not self._is_valid_channel(name):
|
|
||||||
continue
|
|
||||||
if name.lower() not in seen:
|
|
||||||
unique.append(name)
|
|
||||||
seen.add(name.lower())
|
|
||||||
return unique
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _is_valid_channel(name: str) -> bool:
|
|
||||||
if len(name) < 2:
|
|
||||||
return False
|
|
||||||
allowed = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_#"
|
|
||||||
return all(ch in allowed for ch in name)
|
|
||||||
|
|
||||||
def _log(self, message: str, color=None, bold: bool = False):
|
|
||||||
try:
|
|
||||||
self.log_callback(message, color, bold)
|
|
||||||
except Exception:
|
|
||||||
logger.info(message)
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
import argparse
|
import argparse
|
||||||
@@ -246,7 +73,7 @@ if __name__ == "__main__":
|
|||||||
)
|
)
|
||||||
|
|
||||||
# Initialize the server manager
|
# Initialize the server manager
|
||||||
manager = LocalServerManager(
|
manager = LocalServerManager( # pyright: ignore[reportUndefinedVariable]
|
||||||
listen_host=args.host,
|
listen_host=args.host,
|
||||||
listen_port=args.port
|
listen_port=args.port
|
||||||
)
|
)
|
||||||
|
|||||||
179
src/LocalServerManager.py
Normal file
179
src/LocalServerManager.py
Normal file
@@ -0,0 +1,179 @@
|
|||||||
|
import ipaddress
|
||||||
|
import socket
|
||||||
|
import threading
|
||||||
|
import logging
|
||||||
|
from typing import Callable, Iterable, List, Optional
|
||||||
|
from LocalServer import LocalOnlyIRCServer
|
||||||
|
from irc import server as irc_server
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
def _default_log_callback(message: str, color=None, bold: bool = False):
|
||||||
|
"""Fallback logger when UI callback is not available."""
|
||||||
|
logger.info(message)
|
||||||
|
|
||||||
|
class LocalServerManager:
|
||||||
|
"""Manages the background IRC server lifecycle."""
|
||||||
|
|
||||||
|
DEFAULT_CHANNELS = ["#lobby"]
|
||||||
|
DEFAULT_ALLOWED_NETWORKS = [
|
||||||
|
ipaddress.ip_network("127.0.0.0/8"), # Loopback
|
||||||
|
ipaddress.ip_network("10.0.0.0/8"), # RFC1918
|
||||||
|
ipaddress.ip_network("172.16.0.0/12"), # RFC1918
|
||||||
|
ipaddress.ip_network("192.168.0.0/16"), # RFC1918
|
||||||
|
ipaddress.ip_network("169.254.0.0/16"), # Link-local
|
||||||
|
]
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
log_callback: Callable[[str, Optional[object], bool], None] = _default_log_callback,
|
||||||
|
listen_host: str = "0.0.0.0",
|
||||||
|
listen_port: int = 6667,
|
||||||
|
):
|
||||||
|
self.log_callback = log_callback or _default_log_callback
|
||||||
|
self.listen_host = listen_host
|
||||||
|
self.listen_port = listen_port
|
||||||
|
self.allowed_networks = list(self.DEFAULT_ALLOWED_NETWORKS)
|
||||||
|
self._channels = list(self.DEFAULT_CHANNELS)
|
||||||
|
|
||||||
|
self._server = None
|
||||||
|
self._thread = None
|
||||||
|
self._lock = threading.RLock()
|
||||||
|
self._running = threading.Event()
|
||||||
|
self._ready = threading.Event()
|
||||||
|
self._error: Optional[Exception] = None
|
||||||
|
|
||||||
|
# Public API ---------------------------------------------------------
|
||||||
|
def start(self, timeout: float = 5.0):
|
||||||
|
"""Start the background IRC server."""
|
||||||
|
with self._lock:
|
||||||
|
if self._running.is_set():
|
||||||
|
raise RuntimeError("Local IRC server is already running.")
|
||||||
|
|
||||||
|
self._running.set()
|
||||||
|
self._ready.clear()
|
||||||
|
self._error = None
|
||||||
|
|
||||||
|
self._thread = threading.Thread(
|
||||||
|
target=self._serve_forever,
|
||||||
|
name="Local-IRC-Server",
|
||||||
|
daemon=True,
|
||||||
|
)
|
||||||
|
self._thread.start()
|
||||||
|
|
||||||
|
if not self._ready.wait(timeout):
|
||||||
|
self._running.clear()
|
||||||
|
raise TimeoutError("Local IRC server failed to start in time.")
|
||||||
|
|
||||||
|
if self._error:
|
||||||
|
raise self._error
|
||||||
|
|
||||||
|
def stop(self, timeout: float = 5.0):
|
||||||
|
"""Stop the IRC server if it is running."""
|
||||||
|
with self._lock:
|
||||||
|
if not self._running.is_set():
|
||||||
|
return
|
||||||
|
|
||||||
|
server = self._server
|
||||||
|
thread = self._thread
|
||||||
|
|
||||||
|
if server:
|
||||||
|
server.shutdown()
|
||||||
|
server.server_close()
|
||||||
|
|
||||||
|
if thread:
|
||||||
|
thread.join(timeout=timeout)
|
||||||
|
|
||||||
|
with self._lock:
|
||||||
|
self._server = None
|
||||||
|
self._thread = None
|
||||||
|
self._running.clear()
|
||||||
|
self._ready.clear()
|
||||||
|
|
||||||
|
def is_running(self) -> bool:
|
||||||
|
return self._running.is_set()
|
||||||
|
|
||||||
|
def set_listen_host(self, host: str):
|
||||||
|
"""Update the bind address. Local server must be stopped."""
|
||||||
|
with self._lock:
|
||||||
|
if self._running.is_set():
|
||||||
|
raise RuntimeError("Stop the server before changing the interface.")
|
||||||
|
self.listen_host = host
|
||||||
|
self._log(f"Local server interface set to {host}.")
|
||||||
|
|
||||||
|
def get_channels(self) -> List[str]:
|
||||||
|
with self._lock:
|
||||||
|
return list(self._channels)
|
||||||
|
|
||||||
|
def set_channels(self, channels: Iterable[str]):
|
||||||
|
cleaned = self._sanitize_channels(channels)
|
||||||
|
with self._lock:
|
||||||
|
self._channels = cleaned or list(self.DEFAULT_CHANNELS)
|
||||||
|
|
||||||
|
if self.is_running():
|
||||||
|
self._log(
|
||||||
|
"Channel list updated. Restart local server to apply changes.",
|
||||||
|
bold=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Internal helpers ---------------------------------------------------
|
||||||
|
def _serve_forever(self):
|
||||||
|
try:
|
||||||
|
server = LocalOnlyIRCServer(
|
||||||
|
(self.listen_host, self.listen_port),
|
||||||
|
irc_server.IRCClient,
|
||||||
|
self.allowed_networks,
|
||||||
|
blocked_callback=lambda ip: self._log(
|
||||||
|
f"Blocked connection attempt from {ip}", bold=False
|
||||||
|
),
|
||||||
|
)
|
||||||
|
server.servername = socket.gethostname() or "wxirc-local"
|
||||||
|
|
||||||
|
with self._lock:
|
||||||
|
self._server = server
|
||||||
|
seed_channels = self._channels or list(self.DEFAULT_CHANNELS)
|
||||||
|
for channel in seed_channels:
|
||||||
|
server.channels.setdefault(channel, irc_server.IRCChannel(channel))
|
||||||
|
|
||||||
|
self._log(
|
||||||
|
f"Local IRC server listening on {self.listen_host}:{self.listen_port}",
|
||||||
|
bold=True,
|
||||||
|
)
|
||||||
|
self._ready.set()
|
||||||
|
server.serve_forever()
|
||||||
|
except Exception as exc:
|
||||||
|
logger.exception("Local IRC server failed: %s", exc)
|
||||||
|
self._error = exc
|
||||||
|
self._ready.set()
|
||||||
|
self._log(f"Local server error: {exc}", bold=True)
|
||||||
|
finally:
|
||||||
|
self._running.clear()
|
||||||
|
self._log("Local IRC server stopped.")
|
||||||
|
|
||||||
|
def _sanitize_channels(self, channels: Iterable[str]) -> List[str]:
|
||||||
|
unique = []
|
||||||
|
seen = set()
|
||||||
|
for channel in channels:
|
||||||
|
if not channel:
|
||||||
|
continue
|
||||||
|
name = channel.strip()
|
||||||
|
if not name.startswith("#"):
|
||||||
|
name = f"#{name}"
|
||||||
|
if not self._is_valid_channel(name):
|
||||||
|
continue
|
||||||
|
if name.lower() not in seen:
|
||||||
|
unique.append(name)
|
||||||
|
seen.add(name.lower())
|
||||||
|
return unique
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _is_valid_channel(name: str) -> bool:
|
||||||
|
if len(name) < 2:
|
||||||
|
return False
|
||||||
|
allowed = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_#"
|
||||||
|
return all(ch in allowed for ch in name)
|
||||||
|
|
||||||
|
def _log(self, message: str, color=None, bold: bool = False):
|
||||||
|
try:
|
||||||
|
self.log_callback(message, color, bold)
|
||||||
|
except Exception:
|
||||||
|
logger.info(message)
|
||||||
104
src/ManageChannelsDialog.py
Normal file
104
src/ManageChannelsDialog.py
Normal file
@@ -0,0 +1,104 @@
|
|||||||
|
import wx
|
||||||
|
import wx.adv
|
||||||
|
|
||||||
|
class ManageChannelsDialog(wx.Dialog):
|
||||||
|
"""Simple dialog for curating the local server channel allowlist."""
|
||||||
|
|
||||||
|
def __init__(self, parent, channels):
|
||||||
|
super().__init__(parent, title="Manage Local Channels", size=(360, 420))
|
||||||
|
try:
|
||||||
|
self.SetIcon(parent.GetIcon())
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
panel = wx.Panel(self)
|
||||||
|
panel.SetBackgroundColour(parent.theme["window_bg"])
|
||||||
|
|
||||||
|
main_sizer = wx.BoxSizer(wx.VERTICAL)
|
||||||
|
info = wx.StaticText(
|
||||||
|
panel,
|
||||||
|
label="Channels are shared with anyone on your LAN who joins the built-in server.",
|
||||||
|
)
|
||||||
|
info.Wrap(320)
|
||||||
|
main_sizer.Add(info, 0, wx.ALL | wx.EXPAND, 8)
|
||||||
|
|
||||||
|
self.list_box = wx.ListBox(panel)
|
||||||
|
for channel in channels:
|
||||||
|
self.list_box.Append(channel)
|
||||||
|
main_sizer.Add(self.list_box, 1, wx.ALL | wx.EXPAND, 8)
|
||||||
|
|
||||||
|
input_sizer = wx.BoxSizer(wx.HORIZONTAL)
|
||||||
|
self.channel_input = wx.TextCtrl(panel, style=wx.TE_PROCESS_ENTER)
|
||||||
|
self.channel_input.SetHint("#channel-name")
|
||||||
|
add_btn = wx.Button(panel, label="Add")
|
||||||
|
input_sizer.Add(self.channel_input, 1, wx.RIGHT, 4)
|
||||||
|
input_sizer.Add(add_btn, 0)
|
||||||
|
main_sizer.Add(input_sizer, 0, wx.EXPAND | wx.LEFT | wx.RIGHT, 8)
|
||||||
|
|
||||||
|
remove_btn = wx.Button(panel, label="Remove Selected")
|
||||||
|
reset_btn = wx.Button(panel, label="Reset to #lobby")
|
||||||
|
btn_sizer = wx.BoxSizer(wx.HORIZONTAL)
|
||||||
|
btn_sizer.Add(remove_btn, 1, wx.RIGHT, 4)
|
||||||
|
btn_sizer.Add(reset_btn, 1)
|
||||||
|
main_sizer.Add(btn_sizer, 0, wx.EXPAND | wx.LEFT | wx.RIGHT | wx.TOP, 8)
|
||||||
|
|
||||||
|
button_bar = wx.StdDialogButtonSizer()
|
||||||
|
ok_btn = wx.Button(panel, wx.ID_OK)
|
||||||
|
cancel_btn = wx.Button(panel, wx.ID_CANCEL)
|
||||||
|
button_bar.AddButton(ok_btn)
|
||||||
|
button_bar.AddButton(cancel_btn)
|
||||||
|
button_bar.Realize()
|
||||||
|
main_sizer.Add(button_bar, 0, wx.ALL | wx.EXPAND, 8)
|
||||||
|
|
||||||
|
panel.SetSizer(main_sizer)
|
||||||
|
|
||||||
|
# Bindings
|
||||||
|
add_btn.Bind(wx.EVT_BUTTON, self.on_add)
|
||||||
|
remove_btn.Bind(wx.EVT_BUTTON, self.on_remove)
|
||||||
|
reset_btn.Bind(wx.EVT_BUTTON, self.on_reset)
|
||||||
|
self.channel_input.Bind(wx.EVT_TEXT_ENTER, self.on_add)
|
||||||
|
ok_btn.Bind(wx.EVT_BUTTON, self.on_ok)
|
||||||
|
|
||||||
|
def get_channels(self):
|
||||||
|
return [self.list_box.GetString(i) for i in range(self.list_box.GetCount())]
|
||||||
|
|
||||||
|
def on_add(self, event):
|
||||||
|
value = self.channel_input.GetValue().strip()
|
||||||
|
if not value:
|
||||||
|
return
|
||||||
|
if not value.startswith('#'):
|
||||||
|
value = f"#{value}"
|
||||||
|
if not self._is_valid_channel(value):
|
||||||
|
wx.MessageBox(
|
||||||
|
"Channel names may contain letters, numbers, -, and _ only.",
|
||||||
|
"Invalid Channel",
|
||||||
|
wx.OK | wx.ICON_WARNING,
|
||||||
|
)
|
||||||
|
return
|
||||||
|
if self.list_box.FindString(value) != wx.NOT_FOUND:
|
||||||
|
wx.MessageBox("Channel already exists.", "Duplicate Channel", wx.OK | wx.ICON_INFORMATION)
|
||||||
|
return
|
||||||
|
self.list_box.Append(value)
|
||||||
|
self.channel_input.Clear()
|
||||||
|
|
||||||
|
def on_remove(self, event):
|
||||||
|
selection = self.list_box.GetSelection()
|
||||||
|
if selection != wx.NOT_FOUND:
|
||||||
|
self.list_box.Delete(selection)
|
||||||
|
|
||||||
|
def on_reset(self, event):
|
||||||
|
self.list_box.Clear()
|
||||||
|
self.list_box.Append("#lobby")
|
||||||
|
|
||||||
|
def on_ok(self, event):
|
||||||
|
if self.list_box.GetCount() == 0:
|
||||||
|
wx.MessageBox("Add at least one channel before saving.", "No Channels", wx.OK | wx.ICON_INFORMATION)
|
||||||
|
return
|
||||||
|
event.Skip()
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _is_valid_channel(name):
|
||||||
|
if len(name) < 2:
|
||||||
|
return False
|
||||||
|
allowed = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_#"
|
||||||
|
return all(ch in allowed for ch in name)
|
||||||
|
|
||||||
120
src/ScanHandler.py
Normal file
120
src/ScanHandler.py
Normal file
@@ -0,0 +1,120 @@
|
|||||||
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||||
|
import socket
|
||||||
|
import threading
|
||||||
|
import psutil
|
||||||
|
import ipaddress
|
||||||
|
import logging
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
class ScanHandler:
|
||||||
|
"""Fast local network IRC scanner with minimal network overhead."""
|
||||||
|
|
||||||
|
def __init__(self, timeout=0.35, max_workers=64):
|
||||||
|
self.timeout = timeout
|
||||||
|
self.max_workers = max_workers
|
||||||
|
self._stop_event = threading.Event()
|
||||||
|
self._thread = None
|
||||||
|
self.results = []
|
||||||
|
self.total_hosts = 0
|
||||||
|
|
||||||
|
def detect_networks(self):
|
||||||
|
"""Return private IPv4 networks discovered on local interfaces."""
|
||||||
|
networks = []
|
||||||
|
try:
|
||||||
|
for iface, addrs in psutil.net_if_addrs().items():
|
||||||
|
for addr in addrs:
|
||||||
|
if addr.family != socket.AF_INET or not addr.netmask:
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
interface = ipaddress.IPv4Interface(f"{addr.address}/{addr.netmask}")
|
||||||
|
except ValueError:
|
||||||
|
continue
|
||||||
|
if not interface.network.is_private:
|
||||||
|
continue
|
||||||
|
network = self._cap_network(interface)
|
||||||
|
label = f"{iface} : {interface.ip} through {network.with_prefixlen} range"
|
||||||
|
networks.append({"label": label, "cidr": str(network)})
|
||||||
|
except Exception as exc:
|
||||||
|
logger.error("Failed to enumerate interfaces: %s", exc)
|
||||||
|
|
||||||
|
if not networks:
|
||||||
|
default_net = "192.168.1.0/24"
|
||||||
|
label = f"Default guess : {default_net}"
|
||||||
|
networks.append({"label": label, "cidr": default_net})
|
||||||
|
return networks
|
||||||
|
|
||||||
|
def start_scan(self, network_cidr, ports, progress_cb=None, result_cb=None, done_cb=None):
|
||||||
|
"""Launch the threaded scan."""
|
||||||
|
if self._thread and self._thread.is_alive():
|
||||||
|
return False
|
||||||
|
try:
|
||||||
|
network = ipaddress.ip_network(network_cidr, strict=False)
|
||||||
|
except ValueError:
|
||||||
|
return False
|
||||||
|
|
||||||
|
hosts = [str(host) for host in network.hosts()]
|
||||||
|
if not hosts:
|
||||||
|
hosts = [str(network.network_address)]
|
||||||
|
self.total_hosts = len(hosts)
|
||||||
|
self.results.clear()
|
||||||
|
self._stop_event.clear()
|
||||||
|
|
||||||
|
def _worker():
|
||||||
|
logger.info("Starting IRC scan across %s hosts (%s)", len(hosts), network_cidr)
|
||||||
|
scanned = 0
|
||||||
|
try:
|
||||||
|
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
|
||||||
|
futures = {executor.submit(self._probe_host, host, ports): host for host in hosts}
|
||||||
|
for future in as_completed(futures):
|
||||||
|
if self._stop_event.is_set():
|
||||||
|
break
|
||||||
|
scanned += 1
|
||||||
|
server_info = future.result()
|
||||||
|
if server_info:
|
||||||
|
self.results.append(server_info)
|
||||||
|
if result_cb:
|
||||||
|
result_cb(server_info)
|
||||||
|
if progress_cb:
|
||||||
|
progress_cb(scanned, self.total_hosts)
|
||||||
|
except Exception as exc:
|
||||||
|
logger.error("Scan failure: %s", exc)
|
||||||
|
finally:
|
||||||
|
if done_cb:
|
||||||
|
done_cb(self.results)
|
||||||
|
logger.info("IRC scan finished (%s discovered)", len(self.results))
|
||||||
|
|
||||||
|
self._thread = threading.Thread(target=_worker, name="IRC-Scan", daemon=True)
|
||||||
|
self._thread.start()
|
||||||
|
return True
|
||||||
|
|
||||||
|
def stop_scan(self):
|
||||||
|
self._stop_event.set()
|
||||||
|
|
||||||
|
def _probe_host(self, host, ports):
|
||||||
|
if self._stop_event.is_set():
|
||||||
|
return None
|
||||||
|
for port in ports:
|
||||||
|
try:
|
||||||
|
with socket.create_connection((host, port), timeout=self.timeout) as sock:
|
||||||
|
sock.settimeout(0.2)
|
||||||
|
banner = ""
|
||||||
|
try:
|
||||||
|
chunk = sock.recv(256)
|
||||||
|
if chunk:
|
||||||
|
banner = chunk.decode(errors="ignore").strip()
|
||||||
|
except socket.timeout:
|
||||||
|
banner = "IRC server (silent banner)"
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
return {"address": host, "port": port, "banner": banner or "IRC server detected"}
|
||||||
|
except (socket.timeout, ConnectionRefusedError, OSError):
|
||||||
|
continue
|
||||||
|
return None
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _cap_network(interface):
|
||||||
|
"""Cap huge networks to /24 to keep scans lightweight."""
|
||||||
|
if interface.network.prefixlen >= 24:
|
||||||
|
return interface.network
|
||||||
|
return ipaddress.ip_network(f"{interface.ip}/24", strict=False)
|
||||||
@@ -1,299 +1,19 @@
|
|||||||
import ipaddress
|
|
||||||
import logging
|
import logging
|
||||||
import socket
|
import ScanHandler
|
||||||
import threading
|
import ScanWizardIntroPage
|
||||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
import ScanWizardResultsPage
|
||||||
|
|
||||||
import psutil
|
|
||||||
import wx
|
import wx
|
||||||
import wx.adv as adv
|
import wx.adv as adv
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class ScanHandler:
|
|
||||||
"""Fast local network IRC scanner with minimal network overhead."""
|
|
||||||
|
|
||||||
def __init__(self, timeout=0.35, max_workers=64):
|
|
||||||
self.timeout = timeout
|
|
||||||
self.max_workers = max_workers
|
|
||||||
self._stop_event = threading.Event()
|
|
||||||
self._thread = None
|
|
||||||
self.results = []
|
|
||||||
self.total_hosts = 0
|
|
||||||
|
|
||||||
def detect_networks(self):
|
|
||||||
"""Return private IPv4 networks discovered on local interfaces."""
|
|
||||||
networks = []
|
|
||||||
try:
|
|
||||||
for iface, addrs in psutil.net_if_addrs().items():
|
|
||||||
for addr in addrs:
|
|
||||||
if addr.family != socket.AF_INET or not addr.netmask:
|
|
||||||
continue
|
|
||||||
try:
|
|
||||||
interface = ipaddress.IPv4Interface(f"{addr.address}/{addr.netmask}")
|
|
||||||
except ValueError:
|
|
||||||
continue
|
|
||||||
if not interface.network.is_private:
|
|
||||||
continue
|
|
||||||
network = self._cap_network(interface)
|
|
||||||
label = f"{iface} : {interface.ip} through {network.with_prefixlen} range"
|
|
||||||
networks.append({"label": label, "cidr": str(network)})
|
|
||||||
except Exception as exc:
|
|
||||||
logger.error("Failed to enumerate interfaces: %s", exc)
|
|
||||||
|
|
||||||
if not networks:
|
|
||||||
default_net = "192.168.1.0/24"
|
|
||||||
label = f"Default guess : {default_net}"
|
|
||||||
networks.append({"label": label, "cidr": default_net})
|
|
||||||
return networks
|
|
||||||
|
|
||||||
def start_scan(self, network_cidr, ports, progress_cb=None, result_cb=None, done_cb=None):
|
|
||||||
"""Launch the threaded scan."""
|
|
||||||
if self._thread and self._thread.is_alive():
|
|
||||||
return False
|
|
||||||
try:
|
|
||||||
network = ipaddress.ip_network(network_cidr, strict=False)
|
|
||||||
except ValueError:
|
|
||||||
return False
|
|
||||||
|
|
||||||
hosts = [str(host) for host in network.hosts()]
|
|
||||||
if not hosts:
|
|
||||||
hosts = [str(network.network_address)]
|
|
||||||
self.total_hosts = len(hosts)
|
|
||||||
self.results.clear()
|
|
||||||
self._stop_event.clear()
|
|
||||||
|
|
||||||
def _worker():
|
|
||||||
logger.info("Starting IRC scan across %s hosts (%s)", len(hosts), network_cidr)
|
|
||||||
scanned = 0
|
|
||||||
try:
|
|
||||||
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
|
|
||||||
futures = {executor.submit(self._probe_host, host, ports): host for host in hosts}
|
|
||||||
for future in as_completed(futures):
|
|
||||||
if self._stop_event.is_set():
|
|
||||||
break
|
|
||||||
scanned += 1
|
|
||||||
server_info = future.result()
|
|
||||||
if server_info:
|
|
||||||
self.results.append(server_info)
|
|
||||||
if result_cb:
|
|
||||||
result_cb(server_info)
|
|
||||||
if progress_cb:
|
|
||||||
progress_cb(scanned, self.total_hosts)
|
|
||||||
except Exception as exc:
|
|
||||||
logger.error("Scan failure: %s", exc)
|
|
||||||
finally:
|
|
||||||
if done_cb:
|
|
||||||
done_cb(self.results)
|
|
||||||
logger.info("IRC scan finished (%s discovered)", len(self.results))
|
|
||||||
|
|
||||||
self._thread = threading.Thread(target=_worker, name="IRC-Scan", daemon=True)
|
|
||||||
self._thread.start()
|
|
||||||
return True
|
|
||||||
|
|
||||||
def stop_scan(self):
|
|
||||||
self._stop_event.set()
|
|
||||||
|
|
||||||
def _probe_host(self, host, ports):
|
|
||||||
if self._stop_event.is_set():
|
|
||||||
return None
|
|
||||||
for port in ports:
|
|
||||||
try:
|
|
||||||
with socket.create_connection((host, port), timeout=self.timeout) as sock:
|
|
||||||
sock.settimeout(0.2)
|
|
||||||
banner = ""
|
|
||||||
try:
|
|
||||||
chunk = sock.recv(256)
|
|
||||||
if chunk:
|
|
||||||
banner = chunk.decode(errors="ignore").strip()
|
|
||||||
except socket.timeout:
|
|
||||||
banner = "IRC server (silent banner)"
|
|
||||||
except OSError:
|
|
||||||
pass
|
|
||||||
return {"address": host, "port": port, "banner": banner or "IRC server detected"}
|
|
||||||
except (socket.timeout, ConnectionRefusedError, OSError):
|
|
||||||
continue
|
|
||||||
return None
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _cap_network(interface):
|
|
||||||
"""Cap huge networks to /24 to keep scans lightweight."""
|
|
||||||
if interface.network.prefixlen >= 24:
|
|
||||||
return interface.network
|
|
||||||
return ipaddress.ip_network(f"{interface.ip}/24", strict=False)
|
|
||||||
|
|
||||||
|
|
||||||
class ScanWizardIntroPage(adv.WizardPageSimple):
|
|
||||||
def __init__(self, parent, scan_handler):
|
|
||||||
super().__init__(parent)
|
|
||||||
self.scan_handler = scan_handler
|
|
||||||
self.networks = self.scan_handler.detect_networks()
|
|
||||||
self._build_ui()
|
|
||||||
|
|
||||||
def _build_ui(self):
|
|
||||||
sizer = wx.BoxSizer(wx.VERTICAL)
|
|
||||||
|
|
||||||
intro = wx.StaticText(self, label="Scan the local network for open IRC servers.\n\n Security Warning: This scan may reveal information about your device, and may make you vulnerable to attacks.")
|
|
||||||
intro.Wrap(420)
|
|
||||||
sizer.Add(intro, 0, wx.ALL, 5)
|
|
||||||
|
|
||||||
sizer.Add(wx.StaticText(self, label="Network"), 0, wx.TOP | wx.LEFT, 8)
|
|
||||||
labels = [net["label"] for net in self.networks]
|
|
||||||
self.network_choice = wx.Choice(self, choices=labels)
|
|
||||||
if labels:
|
|
||||||
self.network_choice.SetSelection(0)
|
|
||||||
sizer.Add(self.network_choice, 0, wx.EXPAND | wx.ALL, 5)
|
|
||||||
|
|
||||||
sizer.Add(wx.StaticText(self, label="Ports (comma separated)"), 0, wx.TOP | wx.LEFT, 8)
|
|
||||||
self.port_ctrl = wx.TextCtrl(self, value="6667,6697")
|
|
||||||
sizer.Add(self.port_ctrl, 0, wx.EXPAND | wx.ALL, 5)
|
|
||||||
|
|
||||||
self.SetSizer(sizer)
|
|
||||||
|
|
||||||
def get_scan_params(self):
|
|
||||||
selection = self.network_choice.GetSelection()
|
|
||||||
if selection == wx.NOT_FOUND:
|
|
||||||
wx.MessageBox("Select a network to scan.", "Missing selection", wx.OK | wx.ICON_WARNING)
|
|
||||||
return None
|
|
||||||
cidr = self.networks[selection]["cidr"]
|
|
||||||
raw_ports = self.port_ctrl.GetValue().split(",")
|
|
||||||
ports = []
|
|
||||||
for raw in raw_ports:
|
|
||||||
raw = raw.strip()
|
|
||||||
if not raw:
|
|
||||||
continue
|
|
||||||
try:
|
|
||||||
port = int(raw)
|
|
||||||
if 1 <= port <= 65535:
|
|
||||||
ports.append(port)
|
|
||||||
except ValueError:
|
|
||||||
continue
|
|
||||||
if not ports:
|
|
||||||
wx.MessageBox("Enter at least one valid TCP port.", "Invalid ports", wx.OK | wx.ICON_WARNING)
|
|
||||||
return None
|
|
||||||
try:
|
|
||||||
network = ipaddress.ip_network(cidr, strict=False)
|
|
||||||
except ValueError:
|
|
||||||
wx.MessageBox("Invalid network selection.", "Network error", wx.OK | wx.ICON_ERROR)
|
|
||||||
return None
|
|
||||||
host_count = max(network.num_addresses - (2 if network.version == 4 and network.prefixlen <= 30 else 0), 1)
|
|
||||||
return {"cidr": str(network), "ports": ports, "host_count": host_count}
|
|
||||||
|
|
||||||
|
|
||||||
class ScanWizardResultsPage(adv.WizardPageSimple):
|
|
||||||
def __init__(self, parent, scan_handler, main_frame):
|
|
||||||
super().__init__(parent)
|
|
||||||
self.scan_handler = scan_handler
|
|
||||||
self.main_frame = main_frame
|
|
||||||
self.discovered = []
|
|
||||||
self._build_ui()
|
|
||||||
|
|
||||||
def _build_ui(self):
|
|
||||||
sizer = wx.BoxSizer(wx.VERTICAL)
|
|
||||||
|
|
||||||
self.summary = wx.StaticText(self, label="Waiting to start…")
|
|
||||||
sizer.Add(self.summary, 0, wx.ALL, 5)
|
|
||||||
|
|
||||||
self.gauge = wx.Gauge(self, range=100, style=wx.GA_SMOOTH)
|
|
||||||
sizer.Add(self.gauge, 0, wx.EXPAND | wx.ALL, 5)
|
|
||||||
|
|
||||||
self.results_list = wx.ListCtrl(self, style=wx.LC_REPORT | wx.BORDER_SUNKEN)
|
|
||||||
self.results_list.InsertColumn(0, "Address", width=140)
|
|
||||||
self.results_list.InsertColumn(1, "Port", width=60)
|
|
||||||
self.results_list.InsertColumn(2, "Details", width=260)
|
|
||||||
self.results_list.Bind(wx.EVT_LIST_ITEM_SELECTED, self._toggle_buttons)
|
|
||||||
self.results_list.Bind(wx.EVT_LIST_ITEM_DESELECTED, self._toggle_buttons)
|
|
||||||
sizer.Add(self.results_list, 1, wx.EXPAND | wx.ALL, 5)
|
|
||||||
|
|
||||||
btn_row = wx.BoxSizer(wx.HORIZONTAL)
|
|
||||||
self.quick_connect_btn = wx.Button(self, label="Quick Connect")
|
|
||||||
self.quick_connect_btn.Disable()
|
|
||||||
self.quick_connect_btn.Bind(wx.EVT_BUTTON, self.on_quick_connect)
|
|
||||||
btn_row.Add(self.quick_connect_btn, 0, wx.RIGHT, 5)
|
|
||||||
|
|
||||||
self.rescan_btn = wx.Button(self, label="Rescan")
|
|
||||||
self.rescan_btn.Bind(wx.EVT_BUTTON, self.on_rescan)
|
|
||||||
btn_row.Add(self.rescan_btn, 0)
|
|
||||||
sizer.Add(btn_row, 0, wx.ALL | wx.ALIGN_RIGHT, 5)
|
|
||||||
|
|
||||||
self.SetSizer(sizer)
|
|
||||||
|
|
||||||
def prepare_for_scan(self, params, start_callback):
|
|
||||||
self.results_list.DeleteAllItems()
|
|
||||||
self.discovered = []
|
|
||||||
self.summary.SetLabel(f"Scanning {params['cidr']} on ports {', '.join(map(str, params['ports']))}…")
|
|
||||||
self.gauge.SetRange(max(params["host_count"], 1))
|
|
||||||
self.gauge.SetValue(0)
|
|
||||||
self.quick_connect_btn.Disable()
|
|
||||||
start_callback(params)
|
|
||||||
|
|
||||||
def on_scan_progress(self, scanned, total):
|
|
||||||
try:
|
|
||||||
total = max(total, 1)
|
|
||||||
self.gauge.SetRange(total)
|
|
||||||
self.gauge.SetValue(min(scanned, total))
|
|
||||||
self.summary.SetLabel(f"Scanning… {scanned}/{total} hosts checked")
|
|
||||||
except RuntimeError:
|
|
||||||
# C++ SHIT
|
|
||||||
logger.debug("Scan progress update after controls destroyed; ignoring")
|
|
||||||
|
|
||||||
def on_scan_result(self, server_info):
|
|
||||||
"""Handle a single discovered server row."""
|
|
||||||
try:
|
|
||||||
idx = self.results_list.InsertItem(self.results_list.GetItemCount(), server_info["address"])
|
|
||||||
self.results_list.SetItem(idx, 1, str(server_info["port"]))
|
|
||||||
self.results_list.SetItem(idx, 2, server_info.get("banner", "IRC server detected"))
|
|
||||||
self.discovered.append(server_info)
|
|
||||||
self.summary.SetLabel(
|
|
||||||
f"Found {len(self.discovered)} {'server' if len(self.discovered) == 1 else 'servers'}"
|
|
||||||
)
|
|
||||||
except RuntimeError:
|
|
||||||
logger.debug("Scan result update after controls destroyed; ignoring")
|
|
||||||
|
|
||||||
def on_scan_complete(self, results):
|
|
||||||
"""Final scan completion callback."""
|
|
||||||
try:
|
|
||||||
if results:
|
|
||||||
self.summary.SetLabel(
|
|
||||||
f"Scan complete : {len(results)} "
|
|
||||||
f"{'server' if len(results) == 1 else 'servers'} ready."
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
self.summary.SetLabel("Scan complete : no IRC servers discovered.")
|
|
||||||
self._toggle_buttons()
|
|
||||||
except RuntimeError:
|
|
||||||
logger.debug("Scan completion update after controls destroyed; ignoring")
|
|
||||||
|
|
||||||
def on_quick_connect(self, event):
|
|
||||||
row = self.results_list.GetFirstSelected()
|
|
||||||
if row == -1:
|
|
||||||
return
|
|
||||||
server = self.results_list.GetItemText(row, 0)
|
|
||||||
port = int(self.results_list.GetItemText(row, 1))
|
|
||||||
if self.main_frame.quick_connect(server, port):
|
|
||||||
self.GetParent().EndModal(wx.ID_OK)
|
|
||||||
|
|
||||||
def on_rescan(self, event):
|
|
||||||
wizard = self.GetParent()
|
|
||||||
wizard.ShowPage(wizard.intro_page)
|
|
||||||
|
|
||||||
def _toggle_buttons(self, event=None):
|
|
||||||
has_selection = self.results_list.GetFirstSelected() != -1
|
|
||||||
if has_selection:
|
|
||||||
self.quick_connect_btn.Enable()
|
|
||||||
else:
|
|
||||||
self.quick_connect_btn.Disable()
|
|
||||||
|
|
||||||
|
|
||||||
class ScanWizardDialog(adv.Wizard):
|
class ScanWizardDialog(adv.Wizard):
|
||||||
"""Wizard that drives the ScanHandler workflow."""
|
|
||||||
|
|
||||||
def __init__(self, parent):
|
def __init__(self, parent):
|
||||||
super().__init__(parent, title="wxScan")
|
super().__init__(parent, title="wxScan")
|
||||||
self.scan_handler = ScanHandler()
|
self.scan_handler = ScanHandler.ScanHandler()
|
||||||
self.intro_page = ScanWizardIntroPage(self, self.scan_handler)
|
self.intro_page = ScanWizardIntroPage.ScanWizardIntroPage(self, self.scan_handler)
|
||||||
self.results_page = ScanWizardResultsPage(self, self.scan_handler, parent)
|
self.results_page = ScanWizardResultsPage.ScanWizardResultsPage(self, self.scan_handler, parent)
|
||||||
self._chain_pages()
|
self._chain_pages()
|
||||||
self.Bind(adv.EVT_WIZARD_PAGE_CHANGING, self.on_page_changing)
|
self.Bind(adv.EVT_WIZARD_PAGE_CHANGING, self.on_page_changing)
|
||||||
self.Bind(adv.EVT_WIZARD_CANCEL, self.on_cancel)
|
self.Bind(adv.EVT_WIZARD_CANCEL, self.on_cancel)
|
||||||
|
|||||||
59
src/ScanWizardIntroPage.py
Normal file
59
src/ScanWizardIntroPage.py
Normal file
@@ -0,0 +1,59 @@
|
|||||||
|
import wx
|
||||||
|
import wx.adv as adv
|
||||||
|
import ipaddress
|
||||||
|
|
||||||
|
class ScanWizardIntroPage(adv.WizardPageSimple):
|
||||||
|
def __init__(self, parent, scan_handler):
|
||||||
|
super().__init__(parent)
|
||||||
|
self.scan_handler = scan_handler
|
||||||
|
self.networks = self.scan_handler.detect_networks()
|
||||||
|
self._build_ui()
|
||||||
|
|
||||||
|
def _build_ui(self):
|
||||||
|
sizer = wx.BoxSizer(wx.VERTICAL)
|
||||||
|
|
||||||
|
intro = wx.StaticText(self, label="Scan the local network for open IRC servers.\n\n Security Warning: This scan may reveal information about your device, and may make you vulnerable to attacks.")
|
||||||
|
intro.Wrap(420)
|
||||||
|
sizer.Add(intro, 0, wx.ALL, 5)
|
||||||
|
|
||||||
|
sizer.Add(wx.StaticText(self, label="Network"), 0, wx.TOP | wx.LEFT, 8)
|
||||||
|
labels = [net["label"] for net in self.networks]
|
||||||
|
self.network_choice = wx.Choice(self, choices=labels)
|
||||||
|
if labels:
|
||||||
|
self.network_choice.SetSelection(0)
|
||||||
|
sizer.Add(self.network_choice, 0, wx.EXPAND | wx.ALL, 5)
|
||||||
|
|
||||||
|
sizer.Add(wx.StaticText(self, label="Ports (comma separated)"), 0, wx.TOP | wx.LEFT, 8)
|
||||||
|
self.port_ctrl = wx.TextCtrl(self, value="6667,6697")
|
||||||
|
sizer.Add(self.port_ctrl, 0, wx.EXPAND | wx.ALL, 5)
|
||||||
|
|
||||||
|
self.SetSizer(sizer)
|
||||||
|
|
||||||
|
def get_scan_params(self):
|
||||||
|
selection = self.network_choice.GetSelection()
|
||||||
|
if selection == wx.NOT_FOUND:
|
||||||
|
wx.MessageBox("Select a network to scan.", "Missing selection", wx.OK | wx.ICON_WARNING)
|
||||||
|
return None
|
||||||
|
cidr = self.networks[selection]["cidr"]
|
||||||
|
raw_ports = self.port_ctrl.GetValue().split(",")
|
||||||
|
ports = []
|
||||||
|
for raw in raw_ports:
|
||||||
|
raw = raw.strip()
|
||||||
|
if not raw:
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
port = int(raw)
|
||||||
|
if 1 <= port <= 65535:
|
||||||
|
ports.append(port)
|
||||||
|
except ValueError:
|
||||||
|
continue
|
||||||
|
if not ports:
|
||||||
|
wx.MessageBox("Enter at least one valid TCP port.", "Invalid ports", wx.OK | wx.ICON_WARNING)
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
network = ipaddress.ip_network(cidr, strict=False)
|
||||||
|
except ValueError:
|
||||||
|
wx.MessageBox("Invalid network selection.", "Network error", wx.OK | wx.ICON_ERROR)
|
||||||
|
return None
|
||||||
|
host_count = max(network.num_addresses - (2 if network.version == 4 and network.prefixlen <= 30 else 0), 1)
|
||||||
|
return {"cidr": str(network), "ports": ports, "host_count": host_count}
|
||||||
109
src/ScanWizardResultsPage.py
Normal file
109
src/ScanWizardResultsPage.py
Normal file
@@ -0,0 +1,109 @@
|
|||||||
|
import wx
|
||||||
|
import wx.adv as adv
|
||||||
|
import logging
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
class ScanWizardResultsPage(adv.WizardPageSimple):
|
||||||
|
def __init__(self, parent, scan_handler, main_frame):
|
||||||
|
super().__init__(parent)
|
||||||
|
self.scan_handler = scan_handler
|
||||||
|
self.main_frame = main_frame
|
||||||
|
self.discovered = []
|
||||||
|
self._build_ui()
|
||||||
|
|
||||||
|
def _build_ui(self):
|
||||||
|
sizer = wx.BoxSizer(wx.VERTICAL)
|
||||||
|
|
||||||
|
self.summary = wx.StaticText(self, label="Waiting to start…")
|
||||||
|
sizer.Add(self.summary, 0, wx.ALL, 5)
|
||||||
|
|
||||||
|
self.gauge = wx.Gauge(self, range=100, style=wx.GA_SMOOTH)
|
||||||
|
sizer.Add(self.gauge, 0, wx.EXPAND | wx.ALL, 5)
|
||||||
|
|
||||||
|
self.results_list = wx.ListCtrl(self, style=wx.LC_REPORT | wx.BORDER_SUNKEN)
|
||||||
|
self.results_list.InsertColumn(0, "Address", width=140)
|
||||||
|
self.results_list.InsertColumn(1, "Port", width=60)
|
||||||
|
self.results_list.InsertColumn(2, "Details", width=260)
|
||||||
|
self.results_list.Bind(wx.EVT_LIST_ITEM_SELECTED, self._toggle_buttons)
|
||||||
|
self.results_list.Bind(wx.EVT_LIST_ITEM_DESELECTED, self._toggle_buttons)
|
||||||
|
sizer.Add(self.results_list, 1, wx.EXPAND | wx.ALL, 5)
|
||||||
|
|
||||||
|
btn_row = wx.BoxSizer(wx.HORIZONTAL)
|
||||||
|
self.quick_connect_btn = wx.Button(self, label="Quick Connect")
|
||||||
|
self.quick_connect_btn.Disable()
|
||||||
|
self.quick_connect_btn.Bind(wx.EVT_BUTTON, self.on_quick_connect)
|
||||||
|
btn_row.Add(self.quick_connect_btn, 0, wx.RIGHT, 5)
|
||||||
|
|
||||||
|
self.rescan_btn = wx.Button(self, label="Rescan")
|
||||||
|
self.rescan_btn.Bind(wx.EVT_BUTTON, self.on_rescan)
|
||||||
|
btn_row.Add(self.rescan_btn, 0)
|
||||||
|
sizer.Add(btn_row, 0, wx.ALL | wx.ALIGN_RIGHT, 5)
|
||||||
|
|
||||||
|
self.SetSizer(sizer)
|
||||||
|
|
||||||
|
def prepare_for_scan(self, params, start_callback):
|
||||||
|
self.results_list.DeleteAllItems()
|
||||||
|
self.discovered = []
|
||||||
|
self.summary.SetLabel(f"Scanning {params['cidr']} on ports {', '.join(map(str, params['ports']))}…")
|
||||||
|
self.gauge.SetRange(max(params["host_count"], 1))
|
||||||
|
self.gauge.SetValue(0)
|
||||||
|
self.quick_connect_btn.Disable()
|
||||||
|
start_callback(params)
|
||||||
|
|
||||||
|
def on_scan_progress(self, scanned, total):
|
||||||
|
try:
|
||||||
|
total = max(total, 1)
|
||||||
|
self.gauge.SetRange(total)
|
||||||
|
self.gauge.SetValue(min(scanned, total))
|
||||||
|
self.summary.SetLabel(f"Scanning… {scanned}/{total} hosts checked")
|
||||||
|
except RuntimeError:
|
||||||
|
# C++ SHIT
|
||||||
|
logger.debug("Scan progress update after controls destroyed; ignoring")
|
||||||
|
|
||||||
|
def on_scan_result(self, server_info):
|
||||||
|
"""Handle a single discovered server row."""
|
||||||
|
try:
|
||||||
|
idx = self.results_list.InsertItem(self.results_list.GetItemCount(), server_info["address"])
|
||||||
|
self.results_list.SetItem(idx, 1, str(server_info["port"]))
|
||||||
|
self.results_list.SetItem(idx, 2, server_info.get("banner", "IRC server detected"))
|
||||||
|
self.discovered.append(server_info)
|
||||||
|
self.summary.SetLabel(
|
||||||
|
f"Found {len(self.discovered)} {'server' if len(self.discovered) == 1 else 'servers'}"
|
||||||
|
)
|
||||||
|
except RuntimeError:
|
||||||
|
logger.debug("Scan result update after controls destroyed; ignoring")
|
||||||
|
|
||||||
|
def on_scan_complete(self, results):
|
||||||
|
"""Final scan completion callback."""
|
||||||
|
try:
|
||||||
|
if results:
|
||||||
|
self.summary.SetLabel(
|
||||||
|
f"Scan complete : {len(results)} "
|
||||||
|
f"{'server' if len(results) == 1 else 'servers'} ready."
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
self.summary.SetLabel("Scan complete : no IRC servers discovered.")
|
||||||
|
self._toggle_buttons()
|
||||||
|
except RuntimeError:
|
||||||
|
logger.debug("Scan completion update after controls destroyed; ignoring")
|
||||||
|
|
||||||
|
def on_quick_connect(self, event):
|
||||||
|
row = self.results_list.GetFirstSelected()
|
||||||
|
if row == -1:
|
||||||
|
return
|
||||||
|
server = self.results_list.GetItemText(row, 0)
|
||||||
|
port = int(self.results_list.GetItemText(row, 1))
|
||||||
|
if self.main_frame.quick_connect(server, port):
|
||||||
|
self.GetParent().EndModal(wx.ID_OK)
|
||||||
|
|
||||||
|
def on_rescan(self, event):
|
||||||
|
wizard = self.GetParent()
|
||||||
|
wizard.ShowPage(wizard.intro_page)
|
||||||
|
|
||||||
|
def _toggle_buttons(self, event=None):
|
||||||
|
has_selection = self.results_list.GetFirstSelected() != -1
|
||||||
|
if has_selection:
|
||||||
|
self.quick_connect_btn.Enable()
|
||||||
|
else:
|
||||||
|
self.quick_connect_btn.Disable()
|
||||||
205
src/main.py
205
src/main.py
@@ -17,10 +17,12 @@ import sys
|
|||||||
from PrivacyNoticeDialog import PrivacyNoticeDialog
|
from PrivacyNoticeDialog import PrivacyNoticeDialog
|
||||||
from IRCPanel import IRCPanel
|
from IRCPanel import IRCPanel
|
||||||
from AboutDialog import AboutDialog
|
from AboutDialog import AboutDialog
|
||||||
|
from InterfaceSelectDialog import InterfaceSelectDialog
|
||||||
|
from ManageChannelsDialog import ManageChannelsDialog
|
||||||
from NotesDialog import NotesDialog
|
from NotesDialog import NotesDialog
|
||||||
if os.name == "nt": from Win32API import Win32API ; from Win32SoundHandler import Win32SoundHandler
|
if os.name == "nt": from Win32API import Win32API ; from Win32SoundHandler import Win32SoundHandler
|
||||||
from ScanWizard import ScanWizardDialog
|
from ScanWizard import ScanWizardDialog
|
||||||
from LocalServer import LocalServerManager
|
from LocalServerManager import LocalServerManager
|
||||||
|
|
||||||
# Set up logging
|
# Set up logging
|
||||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||||
@@ -43,207 +45,6 @@ class UIUpdate:
|
|||||||
self.args = args
|
self.args = args
|
||||||
self.kwargs = kwargs
|
self.kwargs = kwargs
|
||||||
|
|
||||||
|
|
||||||
class ManageChannelsDialog(wx.Dialog):
|
|
||||||
"""Simple dialog for curating the local server channel allowlist."""
|
|
||||||
|
|
||||||
def __init__(self, parent, channels):
|
|
||||||
super().__init__(parent, title="Manage Local Channels", size=(360, 420))
|
|
||||||
try:
|
|
||||||
self.SetIcon(parent.GetIcon())
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
panel = wx.Panel(self)
|
|
||||||
panel.SetBackgroundColour(parent.theme["window_bg"])
|
|
||||||
|
|
||||||
main_sizer = wx.BoxSizer(wx.VERTICAL)
|
|
||||||
info = wx.StaticText(
|
|
||||||
panel,
|
|
||||||
label="Channels are shared with anyone on your LAN who joins the built-in server.",
|
|
||||||
)
|
|
||||||
info.Wrap(320)
|
|
||||||
main_sizer.Add(info, 0, wx.ALL | wx.EXPAND, 8)
|
|
||||||
|
|
||||||
self.list_box = wx.ListBox(panel)
|
|
||||||
for channel in channels:
|
|
||||||
self.list_box.Append(channel)
|
|
||||||
main_sizer.Add(self.list_box, 1, wx.ALL | wx.EXPAND, 8)
|
|
||||||
|
|
||||||
input_sizer = wx.BoxSizer(wx.HORIZONTAL)
|
|
||||||
self.channel_input = wx.TextCtrl(panel, style=wx.TE_PROCESS_ENTER)
|
|
||||||
self.channel_input.SetHint("#channel-name")
|
|
||||||
add_btn = wx.Button(panel, label="Add")
|
|
||||||
input_sizer.Add(self.channel_input, 1, wx.RIGHT, 4)
|
|
||||||
input_sizer.Add(add_btn, 0)
|
|
||||||
main_sizer.Add(input_sizer, 0, wx.EXPAND | wx.LEFT | wx.RIGHT, 8)
|
|
||||||
|
|
||||||
remove_btn = wx.Button(panel, label="Remove Selected")
|
|
||||||
reset_btn = wx.Button(panel, label="Reset to #lobby")
|
|
||||||
btn_sizer = wx.BoxSizer(wx.HORIZONTAL)
|
|
||||||
btn_sizer.Add(remove_btn, 1, wx.RIGHT, 4)
|
|
||||||
btn_sizer.Add(reset_btn, 1)
|
|
||||||
main_sizer.Add(btn_sizer, 0, wx.EXPAND | wx.LEFT | wx.RIGHT | wx.TOP, 8)
|
|
||||||
|
|
||||||
button_bar = wx.StdDialogButtonSizer()
|
|
||||||
ok_btn = wx.Button(panel, wx.ID_OK)
|
|
||||||
cancel_btn = wx.Button(panel, wx.ID_CANCEL)
|
|
||||||
button_bar.AddButton(ok_btn)
|
|
||||||
button_bar.AddButton(cancel_btn)
|
|
||||||
button_bar.Realize()
|
|
||||||
main_sizer.Add(button_bar, 0, wx.ALL | wx.EXPAND, 8)
|
|
||||||
|
|
||||||
panel.SetSizer(main_sizer)
|
|
||||||
|
|
||||||
# Bindings
|
|
||||||
add_btn.Bind(wx.EVT_BUTTON, self.on_add)
|
|
||||||
remove_btn.Bind(wx.EVT_BUTTON, self.on_remove)
|
|
||||||
reset_btn.Bind(wx.EVT_BUTTON, self.on_reset)
|
|
||||||
self.channel_input.Bind(wx.EVT_TEXT_ENTER, self.on_add)
|
|
||||||
ok_btn.Bind(wx.EVT_BUTTON, self.on_ok)
|
|
||||||
|
|
||||||
def get_channels(self):
|
|
||||||
return [self.list_box.GetString(i) for i in range(self.list_box.GetCount())]
|
|
||||||
|
|
||||||
def on_add(self, event):
|
|
||||||
value = self.channel_input.GetValue().strip()
|
|
||||||
if not value:
|
|
||||||
return
|
|
||||||
if not value.startswith('#'):
|
|
||||||
value = f"#{value}"
|
|
||||||
if not self._is_valid_channel(value):
|
|
||||||
wx.MessageBox(
|
|
||||||
"Channel names may contain letters, numbers, -, and _ only.",
|
|
||||||
"Invalid Channel",
|
|
||||||
wx.OK | wx.ICON_WARNING,
|
|
||||||
)
|
|
||||||
return
|
|
||||||
if self.list_box.FindString(value) != wx.NOT_FOUND:
|
|
||||||
wx.MessageBox("Channel already exists.", "Duplicate Channel", wx.OK | wx.ICON_INFORMATION)
|
|
||||||
return
|
|
||||||
self.list_box.Append(value)
|
|
||||||
self.channel_input.Clear()
|
|
||||||
|
|
||||||
def on_remove(self, event):
|
|
||||||
selection = self.list_box.GetSelection()
|
|
||||||
if selection != wx.NOT_FOUND:
|
|
||||||
self.list_box.Delete(selection)
|
|
||||||
|
|
||||||
def on_reset(self, event):
|
|
||||||
self.list_box.Clear()
|
|
||||||
self.list_box.Append("#lobby")
|
|
||||||
|
|
||||||
def on_ok(self, event):
|
|
||||||
if self.list_box.GetCount() == 0:
|
|
||||||
wx.MessageBox("Add at least one channel before saving.", "No Channels", wx.OK | wx.ICON_INFORMATION)
|
|
||||||
return
|
|
||||||
event.Skip()
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _is_valid_channel(name):
|
|
||||||
if len(name) < 2:
|
|
||||||
return False
|
|
||||||
allowed = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_#"
|
|
||||||
return all(ch in allowed for ch in name)
|
|
||||||
|
|
||||||
|
|
||||||
class InterfaceSelectDialog(wx.Dialog):
|
|
||||||
"""Dialog that lets the user pick which local interface the server should bind to."""
|
|
||||||
|
|
||||||
def __init__(self, parent, current_host="127.0.0.1"):
|
|
||||||
super().__init__(parent, title="Select Network Interface", size=(420, 380))
|
|
||||||
try:
|
|
||||||
self.SetIcon(parent.GetIcon())
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
self.selected_host = current_host
|
|
||||||
panel = wx.Panel(self)
|
|
||||||
panel.SetBackgroundColour(parent.theme["window_bg"])
|
|
||||||
main_sizer = wx.BoxSizer(wx.VERTICAL)
|
|
||||||
|
|
||||||
info = wx.StaticText(
|
|
||||||
panel,
|
|
||||||
label="Choose the Network interface where your server should run:\n"
|
|
||||||
"You are EXPOSING a Server to your LOCAL Network, this may give away who you are!\n",
|
|
||||||
)
|
|
||||||
info.Wrap(380)
|
|
||||||
main_sizer.Add(info, 0, wx.ALL | wx.EXPAND, 10)
|
|
||||||
|
|
||||||
self.interface_list = wx.ListCtrl(panel, style=wx.LC_REPORT | wx.BORDER_SUNKEN)
|
|
||||||
self.interface_list.InsertColumn(0, "Interface", width=180)
|
|
||||||
self.interface_list.InsertColumn(1, "Address", width=180)
|
|
||||||
|
|
||||||
self.interfaces = self._gather_interfaces()
|
|
||||||
current_index = 0
|
|
||||||
for idx, entry in enumerate[tuple[str, str]](self.interfaces):
|
|
||||||
name, address = entry
|
|
||||||
self.interface_list.InsertItem(idx, name)
|
|
||||||
self.interface_list.SetItem(idx, 1, address)
|
|
||||||
if address == current_host:
|
|
||||||
current_index = idx
|
|
||||||
self.interface_list.Select(current_index)
|
|
||||||
self.interface_list.EnsureVisible(current_index)
|
|
||||||
if self.interfaces:
|
|
||||||
self.selected_host = self.interfaces[current_index][1]
|
|
||||||
self.interface_list.Bind(wx.EVT_LIST_ITEM_SELECTED, self.on_select)
|
|
||||||
self.interface_list.Bind(wx.EVT_LIST_ITEM_ACTIVATED, self.on_activate)
|
|
||||||
main_sizer.Add(self.interface_list, 1, wx.ALL | wx.EXPAND, 10)
|
|
||||||
|
|
||||||
button_bar = wx.StdDialogButtonSizer()
|
|
||||||
ok_btn = wx.Button(panel, wx.ID_OK)
|
|
||||||
cancel_btn = wx.Button(panel, wx.ID_CANCEL)
|
|
||||||
button_bar.AddButton(ok_btn)
|
|
||||||
button_bar.AddButton(cancel_btn)
|
|
||||||
button_bar.Realize()
|
|
||||||
main_sizer.Add(button_bar, 0, wx.ALL | wx.EXPAND, 10)
|
|
||||||
|
|
||||||
ok_btn.Bind(wx.EVT_BUTTON, self.on_ok)
|
|
||||||
|
|
||||||
panel.SetSizer(main_sizer)
|
|
||||||
|
|
||||||
def on_select(self, event):
|
|
||||||
index = event.GetIndex()
|
|
||||||
_, address = self.interfaces[index]
|
|
||||||
self.selected_host = address
|
|
||||||
|
|
||||||
def _gather_interfaces(self):
|
|
||||||
entries = [
|
|
||||||
("Loopback only", "127.0.0.1"),
|
|
||||||
("All interfaces", "0.0.0.0"),
|
|
||||||
]
|
|
||||||
try:
|
|
||||||
import psutil
|
|
||||||
|
|
||||||
seen = {addr for _, addr in entries}
|
|
||||||
for name, addrs in psutil.net_if_addrs().items():
|
|
||||||
for addr in addrs:
|
|
||||||
if addr.family == socket.AF_INET and addr.address not in seen:
|
|
||||||
label = f"{name}"
|
|
||||||
entries.append((label, addr.address))
|
|
||||||
seen.add(addr.address)
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning(f"Unable to enumerate network interfaces: {e}")
|
|
||||||
|
|
||||||
return entries
|
|
||||||
|
|
||||||
def get_selected_host(self):
|
|
||||||
return self.selected_host
|
|
||||||
|
|
||||||
def on_activate(self, event):
|
|
||||||
self.on_select(event)
|
|
||||||
self.EndModal(wx.ID_OK)
|
|
||||||
|
|
||||||
def on_ok(self, event):
|
|
||||||
index = self.interface_list.GetFirstSelected()
|
|
||||||
if index == -1 and self.interfaces:
|
|
||||||
wx.MessageBox("Select an interface before starting the server.", "No Interface Selected", wx.OK | wx.ICON_INFORMATION)
|
|
||||||
return
|
|
||||||
if index != -1:
|
|
||||||
_, address = self.interfaces[index]
|
|
||||||
self.selected_host = address
|
|
||||||
event.Skip()
|
|
||||||
|
|
||||||
|
|
||||||
class IRCFrame(wx.Frame):
|
class IRCFrame(wx.Frame):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
super().__init__(None, title="wxIRC", size=(1200, 700))
|
super().__init__(None, title="wxIRC", size=(1200, 700))
|
||||||
|
|||||||
Reference in New Issue
Block a user