import wx import irc.client import threading import re import time import traceback from collections import defaultdict from datetime import datetime import socket import logging import queue import os # Set up logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class UIUpdate: """Thread-safe UI update container""" def __init__(self, callback, *args, **kwargs): self.callback = callback self.args = args self.kwargs = kwargs class SearchDialog(wx.Dialog): def __init__(self, parent): super().__init__(parent, title="Search", style=wx.DEFAULT_DIALOG_STYLE) self.parent = parent sizer = wx.BoxSizer(wx.VERTICAL) # Search input search_sizer = wx.BoxSizer(wx.HORIZONTAL) search_sizer.Add(wx.StaticText(self, label="Search:"), 0, wx.ALIGN_CENTER_VERTICAL | wx.ALL, 5) self.search_ctrl = wx.TextCtrl(self, size=(200, -1)) self.search_ctrl.Bind(wx.EVT_TEXT_ENTER, self.on_search) search_sizer.Add(self.search_ctrl, 1, wx.EXPAND | wx.ALL, 5) sizer.Add(search_sizer, 0, wx.EXPAND) # Options options_sizer = wx.BoxSizer(wx.HORIZONTAL) self.case_sensitive = wx.CheckBox(self, label="Case sensitive") self.whole_word = wx.CheckBox(self, label="Whole word") options_sizer.Add(self.case_sensitive, 0, wx.ALL, 5) options_sizer.Add(self.whole_word, 0, wx.ALL, 5) sizer.Add(options_sizer, 0, wx.EXPAND) # Buttons btn_sizer = wx.StdDialogButtonSizer() self.search_btn = wx.Button(self, wx.ID_OK, "Search") self.search_btn.SetDefault() self.search_btn.Bind(wx.EVT_BUTTON, self.on_search) btn_sizer.AddButton(self.search_btn) close_btn = wx.Button(self, wx.ID_CANCEL, "Close") btn_sizer.AddButton(close_btn) btn_sizer.Realize() sizer.Add(btn_sizer, 0, wx.ALIGN_CENTER | wx.ALL, 10) self.SetSizer(sizer) self.Fit() self.search_ctrl.SetFocus() def on_search(self, event): search_text = self.search_ctrl.GetValue().strip() if search_text: self.parent.perform_search( search_text, self.case_sensitive.IsChecked(), self.whole_word.IsChecked() ) class IRCPanel(wx.Panel): def __init__(self, parent, main_frame): super().__init__(parent) self.parent = parent self.main_frame = main_frame self.messages = [] sizer = wx.BoxSizer(wx.VERTICAL) # Use a better font for chat self.text_ctrl = wx.TextCtrl(self, style=wx.TE_MULTILINE | wx.TE_READONLY | wx.TE_RICH2 | wx.TE_AUTO_URL) self.text_ctrl.SetBackgroundColour(wx.Colour(30, 30, 30)) self.text_ctrl.SetForegroundColour(wx.Colour(220, 220, 220)) # Load Fira Code font self.font = self.load_fira_code_font() self.text_ctrl.SetFont(self.font) sizer.Add(self.text_ctrl, 1, wx.EXPAND | wx.ALL, 0) input_sizer = wx.BoxSizer(wx.HORIZONTAL) self.input_ctrl = wx.TextCtrl(self, style=wx.TE_PROCESS_ENTER) self.input_ctrl.SetHint("Type message here …") self.input_ctrl.Bind(wx.EVT_TEXT_ENTER, self.on_send) self.input_ctrl.Bind(wx.EVT_KEY_DOWN, self.on_key_down) send_btn = wx.Button(self, label="Send") send_btn.SetToolTip("Send message (Enter)") send_btn.Bind(wx.EVT_BUTTON, self.on_send) input_sizer.Add(self.input_ctrl, 1, wx.EXPAND | wx.ALL, 2) input_sizer.Add(send_btn, 0, wx.ALL, 2) sizer.Add(input_sizer, 0, wx.EXPAND | wx.ALL, 0) self.SetSizer(sizer) self.target = None self.history = [] self.history_pos = -1 # Search state self.search_text = "" self.search_positions = [] self.current_search_index = -1 # Bind Ctrl+F for search self.text_ctrl.Bind(wx.EVT_KEY_DOWN, self.on_text_key_down) accel_tbl = wx.AcceleratorTable([(wx.ACCEL_CTRL, ord('F'), wx.ID_FIND)]) self.SetAcceleratorTable(accel_tbl) self.Bind(wx.EVT_MENU, self.on_search, id=wx.ID_FIND) def load_fira_code_font(self): """Load Fira Code font from current directory""" try: # First try to add the font to the system and use it font_path = "FiraCode-Regular.ttf" if os.path.exists(font_path): # Try to load the font file directly font = wx.Font(10, wx.FONTFAMILY_TELETYPE, wx.FONTSTYLE_NORMAL, wx.FONTWEIGHT_NORMAL) # On some systems, we might need to use the font by name after ensuring it's available if wx.TheFontList.FindOrCreateFont(10, wx.FONTFAMILY_TELETYPE, wx.FONTSTYLE_NORMAL, wx.FONTWEIGHT_NORMAL, False, "Fira Code"): font = wx.Font(10, wx.FONTFAMILY_TELETYPE, wx.FONTSTYLE_NORMAL, wx.FONTWEIGHT_NORMAL, False, "Fira Code") logger.info("Successfully loaded Fira Code font") else: # Try creating font from the file path (this works on some wxPython versions) font = wx.Font(wx.FontInfo(10).Family(wx.FONTFAMILY_TELETYPE).FaceName("Fira Code")) logger.info("Using Fira Code font via FaceName") return font else: logger.warning("FiraCode-Regular.ttf not found in current directory") except Exception as e: logger.error(f"Error loading Fira Code font: {e}") # Fall back to system monospace font font = wx.Font(10, wx.FONTFAMILY_TELETYPE, wx.FONTSTYLE_NORMAL, wx.FONTWEIGHT_NORMAL) logger.info("Using system monospace font as fallback") return font def set_target(self, target): self.target = target def add_message(self, message, username=None, username_color=None, message_color=None, bold=False, italic=False, underline=False): """Thread-safe message addition with username coloring""" try: # Use CallAfter for thread safety if wx.IsMainThread(): self._add_message_safe(message, username, username_color, message_color, bold, italic, underline) else: wx.CallAfter(self._add_message_safe, message, username, username_color, message_color, bold, italic, underline) except Exception as e: logger.error(f"Error in add_message: {e}") def _add_message_safe(self, message, username=None, username_color=None, message_color=None, bold=False, italic=False, underline=False): """Actually add message - must be called from main thread""" try: self.messages.append(message) # Save current position for formatting start_pos = self.text_ctrl.GetLastPosition() if username and username_color: # Add username with its color attr = wx.TextAttr() attr.SetTextColour(username_color) if bold: attr.SetFontWeight(wx.FONTWEIGHT_BOLD) if italic: attr.SetFontStyle(wx.FONTSTYLE_ITALIC) if underline: attr.SetFontUnderlined(True) attr.SetFont(self.font) self.text_ctrl.SetDefaultStyle(attr) self.text_ctrl.AppendText(username) # Add the rest of the message with message color attr = wx.TextAttr() if message_color: attr.SetTextColour(message_color) else: attr.SetTextColour(wx.Colour(220, 220, 220)) attr.SetFont(self.font) self.text_ctrl.SetDefaultStyle(attr) # Append the message (without username if we already added it) if username and username_color: # Find the message part after username message_text = message[message.find(username) + len(username):] self.text_ctrl.AppendText(message_text + "\n") else: self.text_ctrl.AppendText(message + "\n") # Auto-scroll to bottom self.text_ctrl.ShowPosition(self.text_ctrl.GetLastPosition()) except Exception as e: logger.error(f"Error adding message safely: {e}") def add_formatted_message(self, timestamp, username, content, username_color=None, is_action=False): """Add a formatted message with colored username""" try: if is_action: message = f"{timestamp}* {username} {content}" self.add_message(message, f"* {username}", username_color, wx.Colour(255, 150, 255), italic=True) else: message = f"{timestamp}<{username}> {content}" self.add_message(message, f"<{username}>", username_color, wx.Colour(220, 220, 220)) except Exception as e: logger.error(f"Error adding formatted message: {e}") def add_system_message(self, message, color=None, bold=False): """Add system message without username coloring""" try: if color is None: color = wx.Colour(180, 180, 255) self.add_message(message, None, None, color, bold, False, False) except Exception as e: logger.error(f"Error adding system message: {e}") def on_text_key_down(self, event): """Handle key events in the text control""" keycode = event.GetKeyCode() if event.ControlDown() and keycode == ord('F'): self.on_search(event) else: event.Skip() def on_search(self, event): """Open search dialog""" try: dlg = SearchDialog(self) dlg.ShowModal() dlg.Destroy() except Exception as e: logger.error(f"Error in search: {e}") def perform_search(self, search_text, case_sensitive=False, whole_word=False): """Perform text search in the chat history""" try: self.search_text = search_text self.search_positions = [] self.current_search_index = -1 # Get all text full_text = self.text_ctrl.GetValue() if not full_text: return # Prepare search parameters flags = 0 if not case_sensitive: flags |= wx.FR_DOWN if whole_word: # For whole word, we'll handle manually pass # Find all occurrences pos = 0 while pos != -1: if whole_word: # Manual whole word search found_pos = full_text.find(search_text, pos) if found_pos == -1: break # Check if it's a whole word if (found_pos == 0 or not full_text[found_pos-1].isalnum()) and \ (found_pos + len(search_text) >= len(full_text) or not full_text[found_pos + len(search_text)].isalnum()): self.search_positions.append(found_pos) pos = found_pos + 1 else: found_pos = self.text_ctrl.FindText(pos, len(full_text), search_text, flags) if found_pos == -1: break self.search_positions.append(found_pos) pos = found_pos + len(search_text) if self.search_positions: self.current_search_index = 0 self.highlight_search_result() self.main_frame.SetStatusText(f"Found {len(self.search_positions)} occurrences of '{search_text}'") else: self.main_frame.SetStatusText(f"Text '{search_text}' not found") wx.Bell() except Exception as e: logger.error(f"Error performing search: {e}") def highlight_search_result(self): """Highlight the current search result""" try: if not self.search_positions or self.current_search_index < 0: return pos = self.search_positions[self.current_search_index] # Select the found text self.text_ctrl.SetSelection(pos, pos + len(self.search_text)) self.text_ctrl.ShowPosition(pos) # Update status self.main_frame.SetStatusText( f"Found {self.current_search_index + 1} of {len(self.search_positions)}: '{self.search_text}'" ) except Exception as e: logger.error(f"Error highlighting search result: {e}") def find_next(self): """Find next occurrence""" if self.search_positions: self.current_search_index = (self.current_search_index + 1) % len(self.search_positions) self.highlight_search_result() def find_previous(self): """Find previous occurrence""" if self.search_positions: self.current_search_index = (self.current_search_index - 1) % len(self.search_positions) self.highlight_search_result() def on_key_down(self, event): try: keycode = event.GetKeyCode() if keycode == wx.WXK_UP: if self.history and self.history_pos < len(self.history) - 1: self.history_pos += 1 self.input_ctrl.SetValue(self.history[-(self.history_pos + 1)]) elif keycode == wx.WXK_DOWN: if self.history_pos > 0: self.history_pos -= 1 self.input_ctrl.SetValue(self.history[-(self.history_pos + 1)]) elif self.history_pos == 0: self.history_pos = -1 self.input_ctrl.Clear() elif keycode == wx.WXK_TAB: # Tab completion for nicknames self.handle_tab_completion() return # Don't skip to prevent default tab behavior elif keycode == wx.WXK_F3: # F3 for find next if self.search_positions: self.find_next() return elif event.ShiftDown() and keycode == wx.WXK_F3: # Shift+F3 for find previous if self.search_positions: self.find_previous() return else: event.Skip() except Exception as e: logger.error(f"Error in key handler: {e}") event.Skip() def handle_tab_completion(self): """Handle tab completion for nicknames""" try: current_text = self.input_ctrl.GetValue() if not current_text or not self.target or not self.target.startswith('#'): return users = self.main_frame.channel_users.get(self.target, []) if not users: return # Find word at cursor position pos = self.input_ctrl.GetInsertionPoint() text_before = current_text[:pos] words = text_before.split() if not words: return current_word = words[-1] # Find matching nicks matches = [user for user in users if user.lower().startswith(current_word.lower())] if matches: if len(matches) == 1: # Single match - complete it new_word = matches[0] if ':' in text_before or text_before.strip().endswith(current_word): # Replace the current word new_text = text_before[:-len(current_word)] + new_word + current_text[pos:] self.input_ctrl.SetValue(new_text) self.input_ctrl.SetInsertionPoint(pos - len(current_word) + len(new_word)) else: # Multiple matches - show in status self.main_frame.SetStatusText(f"Tab completion: {', '.join(matches[:5])}{'...' if len(matches) > 5 else ''}") except Exception as e: logger.error(f"Error in tab completion: {e}") def on_send(self, event): try: message = self.input_ctrl.GetValue().strip() if message and self.target: self.history.append(message) self.history_pos = -1 self.main_frame.send_message(self.target, message) self.input_ctrl.Clear() except Exception as e: logger.error(f"Error sending message: {e}") class IRCFrame(wx.Frame): def __init__(self): super().__init__(None, title="IRC Client", size=(1200, 700)) self.reactor = None self.connection = None self.reactor_thread = None self.connection_lock = threading.RLock() self.is_connecting = False self.is_disconnecting = False self.ui_update_queue = queue.Queue() self.ui_timer = None self.channels = {} self.channel_users = defaultdict(list) self.current_channel = None self.nick = "" self.server = "" self.port = 6667 self.highlights = [] self.auto_join_channels = [] self.away = False self.timestamps = True # User color mapping self.user_colors = {} self.available_colors = [ wx.Colour(255, 150, 150), # Light red wx.Colour(150, 255, 150), # Light green wx.Colour(150, 200, 255), # Light blue wx.Colour(255, 255, 150), # Light yellow wx.Colour(255, 150, 255), # Light magenta wx.Colour(150, 255, 255), # Light cyan wx.Colour(255, 200, 150), # Light orange wx.Colour(200, 150, 255), # Light purple wx.Colour(255, 200, 200), # Pink wx.Colour(200, 255, 200), # Mint wx.Colour(200, 200, 255), # Lavender wx.Colour(255, 255, 200), # Cream ] self.color_index = 0 self.setup_irc_handlers() self.create_menubar() self.setup_ui() # Start UI update timer self.ui_timer = wx.Timer(self) self.Bind(wx.EVT_TIMER, self.process_ui_updates, self.ui_timer) self.ui_timer.Start(50) # Process UI updates every 50ms self.CreateStatusBar() self.SetStatusText("Not connected") self.Centre() self.Show() self.Bind(wx.EVT_CLOSE, self.on_close) # Bind global accelerators for search accel_tbl = wx.AcceleratorTable([ (wx.ACCEL_CTRL, ord('F'), wx.ID_FIND), (wx.ACCEL_NORMAL, wx.WXK_F3, 1001), (wx.ACCEL_SHIFT, wx.WXK_F3, 1002), ]) self.SetAcceleratorTable(accel_tbl) self.Bind(wx.EVT_MENU, self.on_global_search, id=wx.ID_FIND) self.Bind(wx.EVT_MENU, self.on_find_next, id=1001) self.Bind(wx.EVT_MENU, self.on_find_previous, id=1002) def get_user_color(self, username): """Get a consistent color for a username""" if username not in self.user_colors: self.user_colors[username] = self.available_colors[self.color_index] self.color_index = (self.color_index + 1) % len(self.available_colors) return self.user_colors[username] def on_global_search(self, event): """Handle global Ctrl+F search""" current_panel = self.get_current_panel() if current_panel: current_panel.on_search(event) def on_find_next(self, event): """Handle F3 for find next""" current_panel = self.get_current_panel() if current_panel and hasattr(current_panel, 'find_next'): current_panel.find_next() def on_find_previous(self, event): """Handle Shift+F3 for find previous""" current_panel = self.get_current_panel() if current_panel and hasattr(current_panel, 'find_previous'): current_panel.find_previous() def get_current_panel(self): """Get the currently active IRC panel""" try: current_page = self.notebook.GetCurrentPage() if isinstance(current_page, IRCPanel): return current_page except: pass return None def setup_ui(self): """Setup UI components""" panel = wx.Panel(self) main_sizer = wx.BoxSizer(wx.HORIZONTAL) # Left sidebar left_panel = wx.Panel(panel) left_panel.SetBackgroundColour(wx.Colour(45, 45, 45)) left_sizer = wx.BoxSizer(wx.VERTICAL) conn_box = wx.StaticBox(left_panel, label="Connection") conn_box.SetToolTip("Configure IRC server connection") conn_box_sizer = wx.StaticBoxSizer(conn_box, wx.VERTICAL) conn_box_sizer.Add(wx.StaticText(conn_box, label="Server:"), 0, wx.ALL, 2) self.server_ctrl = wx.TextCtrl(conn_box, value="irc.libera.chat") self.server_ctrl.SetToolTip("IRC server address") conn_box_sizer.Add(self.server_ctrl, 0, wx.EXPAND | wx.ALL, 2) conn_box_sizer.Add(wx.StaticText(conn_box, label="Port:"), 0, wx.ALL, 2) self.port_ctrl = wx.TextCtrl(conn_box, value="6667") self.port_ctrl.SetToolTip("IRC server port (usually 6667)") conn_box_sizer.Add(self.port_ctrl, 0, wx.EXPAND | wx.ALL, 2) conn_box_sizer.Add(wx.StaticText(conn_box, label="Nick:"), 0, wx.ALL, 2) self.nick_ctrl = wx.TextCtrl(conn_box, value="wxircuser") self.nick_ctrl.SetToolTip("Your nickname on IRC") conn_box_sizer.Add(self.nick_ctrl, 0, wx.EXPAND | wx.ALL, 2) self.connect_btn = wx.Button(conn_box, label="Connect") self.connect_btn.SetToolTip("Connect to IRC server") self.connect_btn.Bind(wx.EVT_BUTTON, self.on_connect) conn_box_sizer.Add(self.connect_btn, 0, wx.EXPAND | wx.ALL, 5) left_sizer.Add(conn_box_sizer, 0, wx.EXPAND | wx.ALL, 5) # Channel management chan_box = wx.StaticBox(left_panel, label="Channels") chan_box.SetToolTip("Join and manage channels") chan_box_sizer = wx.StaticBoxSizer(chan_box, wx.VERTICAL) join_sizer = wx.BoxSizer(wx.HORIZONTAL) self.channel_input = wx.TextCtrl(chan_box, style=wx.TE_PROCESS_ENTER) self.channel_input.SetHint("Enter channel name …") self.channel_input.SetToolTip("Type channel name …") self.channel_input.Bind(wx.EVT_TEXT_ENTER, self.on_join_channel) join_btn = wx.Button(chan_box, label="Join") join_btn.SetToolTip("Join channel") join_btn.Bind(wx.EVT_BUTTON, self.on_join_channel) join_sizer.Add(self.channel_input, 1, wx.EXPAND | wx.ALL, 2) join_sizer.Add(join_btn, 0, wx.ALL, 2) self.channel_list = wx.ListBox(chan_box) self.channel_list.SetToolTip("Joined channels") self.channel_list.Bind(wx.EVT_LISTBOX, self.on_channel_select) self.channel_list.Bind(wx.EVT_RIGHT_DOWN, self.on_channel_right_click) chan_box_sizer.Add(join_sizer, 0, wx.EXPAND | wx.ALL, 2) chan_box_sizer.Add(self.channel_list, 1, wx.EXPAND | wx.ALL, 2) left_sizer.Add(chan_box_sizer, 1, wx.EXPAND | wx.ALL, 5) left_panel.SetSizer(left_sizer) # Center - Notebook self.notebook = wx.Notebook(panel) # Server panel server_panel = IRCPanel(self.notebook, self) server_panel.set_target("SERVER") self.notebook.AddPage(server_panel, "Server") self.channels["SERVER"] = server_panel # Right sidebar - Users right_panel = wx.Panel(panel) right_panel.SetBackgroundColour(wx.Colour(45, 45, 45)) right_sizer = wx.BoxSizer(wx.VERTICAL) users_box = wx.StaticBox(right_panel, label="Users") users_box_sizer = wx.StaticBoxSizer(users_box, wx.VERTICAL) self.users_list = wx.ListBox(users_box) self.users_list.Bind(wx.EVT_LISTBOX_DCLICK, self.on_user_dclick) self.users_list.Bind(wx.EVT_RIGHT_DOWN, self.on_user_right_click) users_box_sizer.Add(self.users_list, 1, wx.EXPAND | wx.ALL, 2) right_sizer.Add(users_box_sizer, 1, wx.EXPAND | wx.ALL, 5) right_panel.SetSizer(right_sizer) # Add to main sizer main_sizer.Add(left_panel, 0, wx.EXPAND | wx.ALL, 0) main_sizer.Add(self.notebook, 1, wx.EXPAND | wx.ALL, 0) main_sizer.Add(right_panel, 0, wx.EXPAND | wx.ALL, 0) left_panel.SetMinSize((220, -1)) right_panel.SetMinSize((180, -1)) panel.SetSizer(main_sizer) def process_ui_updates(self, event): """Process queued UI updates from timer event""" try: while True: try: update = self.ui_update_queue.get_nowait() if update and update.callback: update.callback(*update.args, **update.kwargs) except queue.Empty: break except Exception as e: logger.error(f"Error processing UI updates: {e}") def safe_ui_update(self, callback, *args, **kwargs): """Safely update UI from any thread""" try: if wx.IsMainThread(): callback(*args, **kwargs) else: self.ui_update_queue.put(UIUpdate(callback, *args, **kwargs)) except Exception as e: logger.error(f"Error in safe_ui_update: {e}") def create_menubar(self): try: menubar = wx.MenuBar() # File menu file_menu = wx.Menu() file_menu.Append(wx.ID_EXIT, "E&xit\tCtrl+Q") self.Bind(wx.EVT_MENU, self.on_close, id=wx.ID_EXIT) # Edit menu with search edit_menu = wx.Menu() edit_menu.Append(wx.ID_FIND, "&Find\tCtrl+F") self.Bind(wx.EVT_MENU, self.on_global_search, id=wx.ID_FIND) edit_menu.Append(1001, "Find &Next\tF3") edit_menu.Append(1002, "Find &Previous\tShift+F3") self.Bind(wx.EVT_MENU, self.on_find_next, id=1001) self.Bind(wx.EVT_MENU, self.on_find_previous, id=1002) # Channel menu channel_menu = wx.Menu() channel_menu.Append(101, "&Join Channel\tCtrl+J") channel_menu.Append(102, "&Part Channel\tCtrl+P") channel_menu.AppendSeparator() channel_menu.Append(103, "Close &Tab\tCtrl+W") self.Bind(wx.EVT_MENU, self.on_menu_join, id=101) self.Bind(wx.EVT_MENU, self.on_menu_part, id=102) self.Bind(wx.EVT_MENU, self.on_menu_close_tab, id=103) # Tools menu tools_menu = wx.Menu() tools_menu.Append(201, "&WHOIS User\tCtrl+I") tools_menu.Append(202, "Change &Nick\tCtrl+N") tools_menu.AppendSeparator() self.away_item = tools_menu.AppendCheckItem(203, "&Away\tCtrl+A") self.timestamp_item = tools_menu.AppendCheckItem(204, "Show &Timestamps") self.timestamp_item.Check(True) tools_menu.AppendSeparator() tools_menu.Append(205, "Set &Highlights") tools_menu.Append(206, "Auto-join Channels") tools_menu.Append(207, "Command Help") self.Bind(wx.EVT_MENU, self.on_menu_whois, id=201) self.Bind(wx.EVT_MENU, self.on_menu_change_nick, id=202) self.Bind(wx.EVT_MENU, self.on_menu_away, id=203) self.Bind(wx.EVT_MENU, self.on_menu_timestamps, id=204) self.Bind(wx.EVT_MENU, self.on_menu_highlights, id=205) self.Bind(wx.EVT_MENU, self.on_menu_autojoin, id=206) self.Bind(wx.EVT_MENU, self.on_menu_help, id=207) menubar.Append(file_menu, "&File") menubar.Append(edit_menu, "&Edit") menubar.Append(channel_menu, "&Channel") menubar.Append(tools_menu, "&Tools") self.SetMenuBar(menubar) except Exception as e: logger.error(f"Error creating menu: {e}") def setup_irc_handlers(self): try: self.reactor = irc.client.Reactor() self.reactor.add_global_handler("all_events", self.on_all_events) # Catch all events self.reactor.add_global_handler("welcome", self.on_welcome) self.reactor.add_global_handler("join", self.on_join) self.reactor.add_global_handler("part", self.on_part) self.reactor.add_global_handler("quit", self.on_quit) self.reactor.add_global_handler("pubmsg", self.on_pubmsg) self.reactor.add_global_handler("privmsg", self.on_privmsg) self.reactor.add_global_handler("namreply", self.on_namreply) self.reactor.add_global_handler("nick", self.on_nick) self.reactor.add_global_handler("mode", self.on_mode) self.reactor.add_global_handler("notice", self.on_notice) self.reactor.add_global_handler("disconnect", self.on_disconnect) self.reactor.add_global_handler("topic", self.on_topic) self.reactor.add_global_handler("kick", self.on_kick) self.reactor.add_global_handler("whoisuser", self.on_whoisuser) self.reactor.add_global_handler("whoischannels", self.on_whoischannels) self.reactor.add_global_handler("whoisserver", self.on_whoisserver) except Exception as e: logger.error(f"Error setting up IRC handlers: {e}") def on_all_events(self, connection, event): """Catch-all handler to log ALL server events in the Server tab""" try: # Don't log certain very frequent events to avoid spam if event.type in ("pubmsg", "privmsg", "action"): return # Format the raw event for display event_info = f"RAW: {event.type.upper()}" if hasattr(event, 'source') and event.source: event_info += f" from {event.source}" if hasattr(event, 'target') and event.target: event_info += f" to {event.target}" if hasattr(event, 'arguments') and event.arguments: event_info += f" - {' '.join(event.arguments)}" self.log_server(event_info, wx.Colour(180, 180, 255), italic=True) except Exception as e: logger.error(f"Error in all_events handler: {e}") def get_timestamp(self): try: if self.timestamps: return f"[{datetime.now().strftime('%H:%M:%S')}] " return "" except Exception as e: logger.error(f"Error getting timestamp: {e}") return "" def is_connected(self): """Thread-safe connection check""" with self.connection_lock: return self.connection and self.connection.is_connected() def on_connect(self, event): try: if not self.is_connected() and not self.is_connecting: self.is_connecting = True self.server = self.server_ctrl.GetValue() try: self.port = int(self.port_ctrl.GetValue()) except ValueError: self.safe_ui_update(self.log_server, "Invalid port number", wx.Colour(255, 100, 100)) self.is_connecting = False return self.nick = self.nick_ctrl.GetValue() if not self.server or not self.nick: self.safe_ui_update(self.log_server, "Server and nick are required", wx.Colour(255, 100, 100)) self.is_connecting = False return self.safe_ui_update(self.connect_btn.Enable, False) self.safe_ui_update(self.log_server, f"Connecting to {self.server}:{self.port} as {self.nick}...", wx.Colour(150, 150, 255)) def connect_thread(): try: logger.info(f"Attempting connection to {self.server}:{self.port}") self.connection = self.reactor.server().connect( self.server, self.port, self.nick, username=self.nick, ircname="wxPython IRC Client", connect_factory=irc.connection.Factory() ) self.reactor_thread = threading.Thread( target=self.safe_reactor_loop, name="IRC-Reactor", daemon=True ) self.reactor_thread.start() self.safe_ui_update(self.on_connect_success) except irc.client.ServerConnectionError as e: error_msg = f"Connection error: {e}" logger.error(error_msg) self.safe_ui_update(self.on_connect_failed, error_msg) except socket.gaierror as e: error_msg = f"DNS resolution failed: {e}" logger.error(error_msg) self.safe_ui_update(self.on_connect_failed, error_msg) except Exception as e: error_msg = f"Unexpected connection error: {e}" logger.error(error_msg) self.safe_ui_update(self.on_connect_failed, error_msg) threading.Thread(target=connect_thread, name="IRC-Connect", daemon=True).start() elif self.is_connected(): self.disconnect() except Exception as e: logger.error(f"Error in connect handler: {e}") self.is_connecting = False self.safe_ui_update(self.connect_btn.Enable, True) def safe_reactor_loop(self): """Safely run the reactor loop with exception handling""" try: self.reactor.process_forever() except Exception as e: logger.error(f"Reactor loop error: {e}") self.safe_ui_update(self.log_server, f"Connection error: {e}", wx.Colour(255, 100, 100)) self.safe_ui_update(self.on_disconnect_cleanup) def on_connect_success(self): """Handle successful connection""" self.is_connecting = False self.connect_btn.SetLabel("Disconnect") self.connect_btn.Enable(True) self.SetStatusText(f"Connected to {self.server} - Use /help for commands, Ctrl+F to search") logger.info(f"Successfully connected to {self.server}") def on_connect_failed(self, error_msg): """Handle connection failure""" self.is_connecting = False self.log_server(error_msg, wx.Colour(255, 100, 100)) self.connect_btn.Enable(True) self.SetStatusText("Connection failed") logger.error(f"Connection failed: {error_msg}") def disconnect(self): """Safely disconnect from IRC server""" try: with self.connection_lock: if self.connection and self.connection.is_connected(): self.is_disconnecting = True self.connection.quit("Goodbye") # Give it a moment to send the quit message threading.Timer(1.0, self.force_disconnect).start() else: self.on_disconnect_cleanup() except Exception as e: logger.error(f"Error during disconnect: {e}") self.on_disconnect_cleanup() def force_disconnect(self): """Force disconnect if graceful quit fails""" try: with self.connection_lock: if self.connection: self.connection.close() self.on_disconnect_cleanup() except Exception as e: logger.error(f"Error during force disconnect: {e}") self.on_disconnect_cleanup() def on_disconnect_cleanup(self): """Clean up after disconnect""" with self.connection_lock: self.connection = None self.is_connecting = False self.is_disconnecting = False self.safe_ui_update(self.connect_btn.SetLabel, "Connect") self.safe_ui_update(self.connect_btn.Enable, True) self.safe_ui_update(self.SetStatusText, "Disconnected") self.safe_ui_update(self.log_server, "Disconnected from server", wx.Colour(255, 100, 100)) def on_join_channel(self, event): try: channel = self.channel_input.GetValue().strip() if channel and self.is_connected(): if not channel.startswith('#'): channel = '#' + channel self.connection.join(channel) self.channel_input.Clear() except Exception as e: logger.error(f"Error joining channel: {e}") self.safe_ui_update(self.log_server, f"Error joining channel: {e}", wx.Colour(255, 100, 100)) def on_channel_select(self, event): try: selection = self.channel_list.GetSelection() if selection != wx.NOT_FOUND: channel = self.channel_list.GetString(selection) self.switch_to_channel(channel) except Exception as e: logger.error(f"Error selecting channel: {e}") def on_channel_right_click(self, event): try: selection = self.channel_list.HitTest(event.GetPosition()) if selection != wx.NOT_FOUND: channel = self.channel_list.GetString(selection) menu = wx.Menu() menu.Append(1, "Part Channel") menu.Append(2, "Close Tab") self.Bind(wx.EVT_MENU, lambda e: self.part_channel(channel), id=1) self.Bind(wx.EVT_MENU, lambda e: self.close_channel(channel), id=2) self.PopupMenu(menu) menu.Destroy() except Exception as e: logger.error(f"Error in channel right click: {e}") def on_user_dclick(self, event): try: selection = self.users_list.GetSelection() if selection != wx.NOT_FOUND: user = self.users_list.GetString(selection) self.open_query(user) except Exception as e: logger.error(f"Error in user double click: {e}") def on_user_right_click(self, event): try: selection = self.users_list.HitTest(event.GetPosition()) if selection != wx.NOT_FOUND: user = self.users_list.GetString(selection) menu = wx.Menu() menu.Append(1, f"Query {user}") menu.Append(2, f"WHOIS {user}") if self.current_channel and self.current_channel.startswith('#'): menu.AppendSeparator() menu.Append(3, f"Kick {user}") self.Bind(wx.EVT_MENU, lambda e: self.open_query(user), id=1) self.Bind(wx.EVT_MENU, lambda e: self.whois_user(user), id=2) if self.current_channel and self.current_channel.startswith('#'): self.Bind(wx.EVT_MENU, lambda e: self.kick_user(user), id=3) self.PopupMenu(menu) menu.Destroy() except Exception as e: logger.error(f"Error in user right click: {e}") def open_query(self, user): try: if user not in self.channels: self.add_channel(user) self.switch_to_channel(user) except Exception as e: logger.error(f"Error opening query: {e}") def whois_user(self, user): try: if self.is_connected(): self.connection.whois([user]) except Exception as e: logger.error(f"Error in WHOIS: {e}") def kick_user(self, user): try: if self.current_channel and self.current_channel.startswith('#') and self.is_connected(): reason = wx.GetTextFromUser("Kick reason:", "Kick User", "Kicked") if reason is not None: self.connection.kick(self.current_channel, user, reason) except Exception as e: logger.error(f"Error kicking user: {e}") def switch_to_channel(self, channel): try: self.current_channel = channel for i in range(self.notebook.GetPageCount()): if self.notebook.GetPageText(i) == channel: self.notebook.SetSelection(i) break self.update_user_list(channel) except Exception as e: logger.error(f"Error switching channel: {e}") def update_user_list(self, channel): """Thread-safe user list update""" def _update_user_list(): try: self.users_list.Clear() if channel in self.channel_users: for user in sorted(self.channel_users[channel]): self.users_list.Append(user) except Exception as e: logger.error(f"Error updating user list: {e}") self.safe_ui_update(_update_user_list) def send_message(self, target, message): try: if message.startswith('/'): self.handle_command(target, message) elif target != "SERVER" and self.is_connected(): self.connection.privmsg(target, message) if target in self.channels: # Show our own message with our color user_color = self.get_user_color(self.nick) timestamp = self.get_timestamp() self.channels[target].add_formatted_message(timestamp, self.nick, message, user_color) except Exception as e: logger.error(f"Error sending message: {e}") self.safe_ui_update(self.log_server, f"Error sending message: {e}", wx.Colour(255, 100, 100)) def handle_command(self, target, message): try: parts = message[1:].split(' ', 1) cmd = parts[0].lower() args = parts[1] if len(parts) > 1 else "" if cmd == "help": help_text = """ Available commands: /join - Join a channel /part [channel] - Leave current or specified channel /msg - Send private message /me - Send action message /nick - Change nickname /whois - Get user information /topic [newtopic] - Get or set channel topic /kick [reason] - Kick user from channel /away [message] - Set away status /quit [message] - Disconnect from server /help - Show this help """ self.log_server(help_text, wx.Colour(200, 255, 200)) elif cmd == "me": if self.is_connected(): self.connection.action(target, args) user_color = self.get_user_color(self.nick) timestamp = self.get_timestamp() self.channels[target].add_formatted_message(timestamp, self.nick, args, user_color, is_action=True) elif cmd == "nick" and self.is_connected(): self.connection.nick(args) elif cmd == "join" and self.is_connected(): if args and not args.startswith('#'): args = '#' + args self.connection.join(args) elif cmd == "part" and self.is_connected(): channel = args if args else target self.connection.part(channel) elif cmd == "quit" and self.is_connected(): reason = args if args else "Goodbye" self.connection.quit(reason) elif cmd == "msg" and self.is_connected(): nick_msg = args.split(' ', 1) if len(nick_msg) == 2: self.connection.privmsg(nick_msg[0], nick_msg[1]) elif cmd == "whois" and self.is_connected(): self.connection.whois([args]) elif cmd == "kick" and self.is_connected(): kick_args = args.split(' ', 1) user = kick_args[0] reason = kick_args[1] if len(kick_args) > 1 else "Kicked" self.connection.kick(target, user, reason) elif cmd == "topic" and self.is_connected(): if args: self.connection.topic(target, args) else: self.connection.topic(target) elif cmd == "away" and self.is_connected(): self.connection.send_raw(f"AWAY :{args}" if args else "AWAY") self.away = bool(args) self.safe_ui_update(self.away_item.Check, self.away) else: self.safe_ui_update(self.log_server, f"Unknown command: {cmd}. Use /help for available commands.", wx.Colour(255, 100, 100)) except Exception as e: logger.error(f"Error handling command: {e}") self.safe_ui_update(self.log_server, f"Error executing command: {e}", wx.Colour(255, 100, 100)) def part_channel(self, channel): try: if self.is_connected(): self.connection.part(channel) except Exception as e: logger.error(f"Error parting channel: {e}") def close_channel(self, channel): try: if channel in self.channels and channel != "SERVER": def _close_channel(): for i in range(self.notebook.GetPageCount()): if self.notebook.GetPageText(i) == channel: self.notebook.DeletePage(i) break del self.channels[channel] idx = self.channel_list.FindString(channel) if idx != wx.NOT_FOUND: self.channel_list.Delete(idx) self.safe_ui_update(_close_channel) except Exception as e: logger.error(f"Error closing channel: {e}") def log_server(self, message, color=None, bold=False, italic=False, underline=False): try: if "SERVER" in self.channels: self.channels["SERVER"].add_system_message(f"{self.get_timestamp()}{message}", color, bold) except Exception as e: logger.error(f"Error logging server message: {e}") def log_channel_message(self, channel, username, message, is_action=False, is_system=False): """Log a message to a channel with username coloring""" try: if channel not in self.channels: self.safe_ui_update(self.add_channel, channel) if channel in self.channels: timestamp = self.get_timestamp() if is_system: # System messages (joins, parts, etc.) user_color = self.get_user_color(username) self.channels[channel].add_system_message(f"{timestamp}{message}", user_color) elif is_action: # Action messages (/me) user_color = self.get_user_color(username) self.channels[channel].add_formatted_message(timestamp, username, message, user_color, is_action=True) else: # Regular messages user_color = self.get_user_color(username) self.channels[channel].add_formatted_message(timestamp, username, message, user_color) # Check for highlights if self.highlights and any(h.lower() in message.lower() for h in self.highlights): self.safe_ui_update(wx.Bell) except Exception as e: logger.error(f"Error logging channel message: {e}") def add_channel(self, channel): try: if channel not in self.channels: panel = IRCPanel(self.notebook, self) panel.set_target(channel) self.notebook.AddPage(panel, channel) self.channels[channel] = panel if channel.startswith('#'): self.channel_list.Append(channel) except Exception as e: logger.error(f"Error adding channel: {e}") # Menu handlers def on_menu_join(self, event): try: dlg = wx.TextEntryDialog(self, "Enter channel name:", "Join Channel") if dlg.ShowModal() == wx.ID_OK: channel = dlg.GetValue() if channel and self.is_connected(): if not channel.startswith('#'): channel = '#' + channel self.connection.join(channel) dlg.Destroy() except Exception as e: logger.error(f"Error in menu join: {e}") def on_menu_part(self, event): try: if self.current_channel and self.current_channel != "SERVER": self.part_channel(self.current_channel) except Exception as e: logger.error(f"Error in menu part: {e}") def on_menu_close_tab(self, event): try: if self.current_channel and self.current_channel != "SERVER": self.close_channel(self.current_channel) except Exception as e: logger.error(f"Error in menu close tab: {e}") def on_menu_whois(self, event): try: dlg = wx.TextEntryDialog(self, "Enter nickname:", "WHOIS") if dlg.ShowModal() == wx.ID_OK: user = dlg.GetValue() if user and self.is_connected(): self.whois_user(user) dlg.Destroy() except Exception as e: logger.error(f"Error in menu whois: {e}") def on_menu_change_nick(self, event): try: dlg = wx.TextEntryDialog(self, "Enter new nickname:", "Change Nick", self.nick) if dlg.ShowModal() == wx.ID_OK: new_nick = dlg.GetValue() if new_nick and self.is_connected(): self.connection.nick(new_nick) dlg.Destroy() except Exception as e: logger.error(f"Error in menu change nick: {e}") def on_menu_away(self, event): try: if self.away and self.is_connected(): self.connection.send_raw("AWAY") self.away = False self.SetStatusText(f"Connected to {self.server}") elif self.is_connected(): msg = wx.GetTextFromUser("Away message:", "Set Away", "Away from keyboard") if msg is not None: self.connection.send_raw(f"AWAY :{msg}") self.away = True self.SetStatusText(f"Connected to {self.server} (Away)") except Exception as e: logger.error(f"Error in menu away: {e}") def on_menu_timestamps(self, event): try: self.timestamps = self.timestamp_item.IsChecked() except Exception as e: logger.error(f"Error in menu timestamps: {e}") def on_menu_highlights(self, event): try: current = ", ".join(self.highlights) dlg = wx.TextEntryDialog(self, "Enter highlight words (comma separated):", "Set Highlights", current) if dlg.ShowModal() == wx.ID_OK: text = dlg.GetValue() self.highlights = [h.strip() for h in text.split(',') if h.strip()] dlg.Destroy() except Exception as e: logger.error(f"Error in menu highlights: {e}") def on_menu_autojoin(self, event): try: current = ", ".join(self.auto_join_channels) dlg = wx.TextEntryDialog(self, "Enter channels to auto-join (comma separated):", "Auto-join Channels", current) if dlg.ShowModal() == wx.ID_OK: text = dlg.GetValue() self.auto_join_channels = [c.strip() for c in text.split(',') if c.strip()] dlg.Destroy() except Exception as e: logger.error(f"Error in menu autojoin: {e}") def on_menu_help(self, event): """Show command help""" help_text = """ IRC Client Help: BASIC USAGE: - Configure server/nick in left panel and click Connect - Join channels using the join box or /join command - Type messages in the input box at bottom - Use Up/Down arrows for message history - Tab for nickname completion in channels - Ctrl+F to search in chat history TEXT FORMATTING: - Usernames are colored for easy identification - URLs are automatically clickable - Each user has a consistent color COMMANDS (type /help in chat for full list): /join #channel - Join a channel /part - Leave current channel /msg nick message - Private message /me action - Action message /nick newname - Change nickname /away [message] - Set away status """ self.log_server(help_text, wx.Colour(200, 255, 200), bold=True) # IRC Event Handlers - All use thread-safe UI updates def on_welcome(self, connection, event): try: self.log_server("Connected to server!", wx.Colour(100, 255, 100), bold=True) self.log_server(f"Welcome message: {' '.join(event.arguments)}", wx.Colour(150, 255, 150)) # Auto-join channels for channel in self.auto_join_channels: if not channel.startswith('#'): channel = '#' + channel time.sleep(0.5) if self.is_connected(): connection.join(channel) except Exception as e: logger.error(f"Error in welcome handler: {e}") def on_join(self, connection, event): try: nick = event.source.nick channel = event.target if nick == self.nick: self.safe_ui_update(self.add_channel, channel) self.log_server(f"Joined channel {channel}", wx.Colour(100, 255, 100)) self.log_channel_message(channel, nick, f"→ {nick} joined", is_system=True) if nick not in self.channel_users[channel]: self.channel_users[channel].append(nick) if channel == self.current_channel: self.update_user_list(channel) except Exception as e: logger.error(f"Error in join handler: {e}") def on_part(self, connection, event): try: nick = event.source.nick channel = event.target reason = event.arguments[0] if event.arguments else "" msg = f"← {nick} left" if reason: msg += f" ({reason})" self.log_channel_message(channel, nick, msg, is_system=True) if nick in self.channel_users[channel]: self.channel_users[channel].remove(nick) if channel == self.current_channel: self.update_user_list(channel) except Exception as e: logger.error(f"Error in part handler: {e}") def on_quit(self, connection, event): try: nick = event.source.nick reason = event.arguments[0] if event.arguments else "Quit" for channel in list(self.channel_users.keys()): if nick in self.channel_users[channel]: self.channel_users[channel].remove(nick) self.log_channel_message(channel, nick, f"← {nick} quit ({reason})", is_system=True) if self.current_channel: self.update_user_list(self.current_channel) except Exception as e: logger.error(f"Error in quit handler: {e}") def on_pubmsg(self, connection, event): try: nick = event.source.nick channel = event.target message = event.arguments[0] # Check for action messages (/me) if message.startswith('\x01ACTION') and message.endswith('\x01'): message = message[8:-1] self.log_channel_message(channel, nick, message, is_action=True) else: self.log_channel_message(channel, nick, message) # Highlight own nick in messages if self.nick.lower() in message.lower(): self.safe_ui_update(wx.Bell) except Exception as e: logger.error(f"Error in pubmsg handler: {e}") def on_privmsg(self, connection, event): try: nick = event.source.nick message = event.arguments[0] # Check for action messages in private queries too if message.startswith('\x01ACTION') and message.endswith('\x01'): message = message[8:-1] self.log_channel_message(nick, nick, message, is_action=True) else: self.log_channel_message(nick, nick, message) except Exception as e: logger.error(f"Error in privmsg handler: {e}") def on_namreply(self, connection, event): try: channel = event.arguments[1] users = event.arguments[2].split() clean_users = [] for user in users: user = user.lstrip("@+%&~") clean_users.append(user) self.channel_users[channel].extend(clean_users) if channel == self.current_channel: self.update_user_list(channel) except Exception as e: logger.error(f"Error in namreply handler: {e}") def on_nick(self, connection, event): try: old_nick = event.source.nick new_nick = event.target # Update color mapping for the new nick if old_nick in self.user_colors: self.user_colors[new_nick] = self.user_colors[old_nick] del self.user_colors[old_nick] if old_nick == self.nick: self.nick = new_nick self.log_server(f"You are now known as {new_nick}", wx.Colour(150, 200, 255), bold=True) for channel in self.channel_users: if old_nick in self.channel_users[channel]: self.channel_users[channel].remove(old_nick) self.channel_users[channel].append(new_nick) self.log_channel_message(channel, new_nick, f"{old_nick} is now known as {new_nick}", is_system=True) if self.current_channel: self.update_user_list(self.current_channel) except Exception as e: logger.error(f"Error in nick handler: {e}") def on_mode(self, connection, event): try: channel = event.target mode = " ".join(event.arguments) nick = event.source.nick self.log_channel_message(channel, nick, f"Mode {mode} by {nick}", is_system=True) except Exception as e: logger.error(f"Error in mode handler: {e}") def on_notice(self, connection, event): try: nick = event.source.nick if hasattr(event.source, 'nick') else str(event.source) message = event.arguments[0] self.log_server(f"-{nick}- {message}", wx.Colour(255, 150, 255), italic=True) except Exception as e: logger.error(f"Error in notice handler: {e}") def on_disconnect(self, connection, event): try: self.log_server("Disconnected from server", wx.Colour(255, 100, 100), bold=True) self.safe_ui_update(self.on_disconnect_cleanup) except Exception as e: logger.error(f"Error in disconnect handler: {e}") def on_topic(self, connection, event): try: channel = event.arguments[0] if event.arguments else event.target topic = event.arguments[1] if len(event.arguments) > 1 else event.arguments[0] nick = event.source.nick if hasattr(event.source, 'nick') else "Server" self.log_channel_message(channel, nick, f"Topic: {topic}", is_system=True) except Exception as e: logger.error(f"Error in topic handler: {e}") def on_kick(self, connection, event): try: channel = event.target kicked = event.arguments[0] reason = event.arguments[1] if len(event.arguments) > 1 else "No reason" kicker = event.source.nick self.log_channel_message(channel, kicker, f"{kicked} was kicked by {kicker} ({reason})", is_system=True) if kicked in self.channel_users[channel]: self.channel_users[channel].remove(kicked) if channel == self.current_channel: self.update_user_list(channel) except Exception as e: logger.error(f"Error in kick handler: {e}") def on_whoisuser(self, connection, event): try: nick = event.arguments[0] user = event.arguments[1] host = event.arguments[2] realname = event.arguments[4] user_color = self.get_user_color(nick) self.log_server(f"WHOIS {nick}: {user}@{host} ({realname})", user_color) except Exception as e: logger.error(f"Error in whoisuser handler: {e}") def on_whoischannels(self, connection, event): try: nick = event.arguments[0] channels = event.arguments[1] user_color = self.get_user_color(nick) self.log_server(f"WHOIS {nick} channels: {channels}", user_color) except Exception as e: logger.error(f"Error in whoischannels handler: {e}") def on_whoisserver(self, connection, event): try: nick = event.arguments[0] server = event.arguments[1] user_color = self.get_user_color(nick) self.log_server(f"WHOIS {nick} server: {server}", user_color) except Exception as e: logger.error(f"Error in whoisserver handler: {e}") def on_close(self, event): try: # Stop UI timer first if self.ui_timer and self.ui_timer.IsRunning(): self.ui_timer.Stop() if self.is_connected(): self.disconnect() # Give it a moment to disconnect gracefully wx.CallLater(1000, self.Destroy) else: self.Destroy() except Exception as e: logger.error(f"Error during close: {e}") self.Destroy() if __name__ == "__main__": try: app = wx.App() frame = IRCFrame() app.MainLoop() except Exception as e: logger.critical(f"Fatal error: {e}") print(f"Fatal error: {e}") traceback.print_exc()