Files
ircdvd/main.py
rattatwinko 4fdceb46ff privacy message. and some more fixes. especially the seraching.
added a requirements file and a build script for linux.
2025-11-23 20:10:03 +01:00

2025 lines
84 KiB
Python

import wx
import irc.client
import threading
import re
import time
import traceback
from collections import defaultdict
from datetime import datetime
import socket
import logging
import queue
import os
import sys
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def get_resource_path(relative_path):
"""Get absolute path to resource, works for dev and for PyInstaller"""
try:
# PyInstaller creates a temp folder and stores path in _MEIPASS
base_path = sys._MEIPASS
except Exception:
base_path = os.path.abspath(".")
return os.path.join(base_path, relative_path)
import platform
import psutil
import socket
import getpass
import subprocess
import re
class PrivacyNoticeDialog(wx.Dialog):
def __init__(self, parent):
super().__init__(parent, title="Privacy Notice", style=wx.DEFAULT_DIALOG_STYLE | wx.RESIZE_BORDER)
self.parent = parent
# Calculate optimal dialog size based on screen size
screen_width, screen_height = wx.DisplaySize()
self.max_width = min(700, screen_width * 0.75)
self.max_height = min(700, screen_height * 0.8)
self.SetMinSize((450, 400))
self.SetSize((self.max_width, self.max_height))
# Create main sizer
main_sizer = wx.BoxSizer(wx.VERTICAL)
# Create header with icon and title
header_sizer = wx.BoxSizer(wx.HORIZONTAL)
# Add info icon
info_icon = wx.ArtProvider.GetBitmap(wx.ART_INFORMATION, wx.ART_MESSAGE_BOX, (32, 32))
icon_ctrl = wx.StaticBitmap(self, -1, info_icon)
header_sizer.Add(icon_ctrl, 0, wx.ALL | wx.ALIGN_CENTER_VERTICAL, 10)
# Add title
title_text = wx.StaticText(self, label="Privacy and System Information")
title_font = wx.Font(14, wx.FONTFAMILY_DEFAULT, wx.FONTSTYLE_NORMAL, wx.FONTWEIGHT_BOLD)
title_text.SetFont(title_font)
header_sizer.Add(title_text, 0, wx.ALL | wx.ALIGN_CENTER_VERTICAL, 10)
main_sizer.Add(header_sizer, 0, wx.EXPAND)
# Create scrolled window for the content
self.scrolled_win = wx.ScrolledWindow(self)
self.scrolled_win.SetScrollRate(10, 10)
scrolled_sizer = wx.BoxSizer(wx.VERTICAL)
# Get system information
system_info = self.get_system_info()
# Create system info section
sysinfo_text = wx.StaticText(self.scrolled_win, label="System Information:")
sysinfo_font = wx.Font(11, wx.FONTFAMILY_DEFAULT, wx.FONTSTYLE_NORMAL, wx.FONTWEIGHT_BOLD)
sysinfo_text.SetFont(sysinfo_font)
scrolled_sizer.Add(sysinfo_text, 0, wx.ALL, 5)
# System info details
info_details = (
f"Operating System: {system_info['os']}\n"
f"Architecture: {system_info['architecture']}\n"
f"Processor: {system_info['processor']}\n"
f"Physical Cores: {system_info['physical_cores']}\n"
f"Total Cores: {system_info['total_cores']}\n"
f"Max Frequency: {system_info['max_frequency']} MHz\n"
f"Total RAM: {system_info['total_ram']} GB\n"
f"Available RAM: {system_info['available_ram']} GB\n"
f"Hostname: {system_info['hostname']}\n"
f"Username: {system_info['username']}\n"
f"Python Version: {system_info['python_version']}\n"
f"wxPython Version: {system_info['wx_version']}"
)
info_text = wx.StaticText(self.scrolled_win, label=info_details)
scrolled_sizer.Add(info_text, 0, wx.ALL | wx.EXPAND, 10)
# Add separator
scrolled_sizer.Add(wx.StaticLine(self.scrolled_win), 0, wx.EXPAND | wx.ALL, 10)
# Security and privacy notice
security_text = wx.StaticText(self.scrolled_win, label="Security and Privacy Notice:")
security_text.SetFont(sysinfo_font)
scrolled_sizer.Add(security_text, 0, wx.ALL, 5)
# Detect potential security features
security_warnings = self.get_security_warnings(system_info)
privacy_notice = (
"wxIRC is an Open Source project and must not be distributed for commercial purposes.\n\n"
"Security Considerations:\n"
f"{security_warnings}\n\n"
"Important Reminders:\n"
"Your system contains hardware level security processors that operate independently\n"
"Network traffic may be monitored by various entities\n"
"Never discuss or plan illegal activities through any communication platform\n"
"Keep your system and software updated for security patches\n"
"Use strong, unique passwords and enable 2FA where available\n\n"
"This application collects no personal data and makes no network connections\n"
"beyond those required for IRC functionality that you explicitly initiate."
)
self.body_text = wx.StaticText(self.scrolled_win, label=privacy_notice)
scrolled_sizer.Add(self.body_text, 1, wx.ALL | wx.EXPAND, 10)
self.scrolled_win.SetSizer(scrolled_sizer)
main_sizer.Add(self.scrolled_win, 1, wx.EXPAND | wx.ALL, 5)
# Add OK button
ok_btn = wx.Button(self, wx.ID_OK, "I Understand and Continue")
ok_btn.SetDefault()
ok_btn.SetMinSize((160, 35))
btn_sizer = wx.BoxSizer(wx.HORIZONTAL)
btn_sizer.AddStretchSpacer()
btn_sizer.Add(ok_btn, 0, wx.ALL, 10)
btn_sizer.AddStretchSpacer()
main_sizer.Add(btn_sizer, 0, wx.EXPAND)
self.SetSizer(main_sizer)
# Center on parent
self.CentreOnParent()
# Bind events
self.Bind(wx.EVT_BUTTON, self.on_ok, ok_btn)
self.Bind(wx.EVT_SIZE, self.on_resize)
# Fit the dialog to content
self.Fit()
self.adjust_to_screen()
def get_system_info(self):
"""Gather comprehensive system information with improved processor detection"""
try:
# CPU information
cpu_freq = psutil.cpu_freq()
max_freq = int(cpu_freq.max) if cpu_freq else "N/A"
# Memory information
memory = psutil.virtual_memory()
total_ram_gb = round(memory.total / (1024**3), 1)
available_ram_gb = round(memory.available / (1024**3), 1)
# Platform information
system = platform.system()
if system == "Windows":
os_info = f"Windows {platform.release()}"
elif system == "Linux":
# Try to get distro info for Linux
try:
with open('/etc/os-release', 'r') as f:
lines = f.readlines()
for line in lines:
if line.startswith('PRETTY_NAME='):
os_info = line.split('=', 1)[1].strip().strip('"')
break
else:
os_info = f"Linux ({platform.release()})"
except:
os_info = f"Linux ({platform.release()})"
elif system == "Darwin":
os_info = f"macOS {platform.mac_ver()[0]}"
else:
os_info = f"{system} {platform.release()}"
# Improved processor detection
processor = self.detect_processor()
return {
'os': os_info,
'architecture': platform.architecture()[0],
'processor': processor,
'physical_cores': psutil.cpu_count(logical=False),
'total_cores': psutil.cpu_count(logical=True),
'max_frequency': max_freq,
'total_ram': total_ram_gb,
'available_ram': available_ram_gb,
'hostname': socket.gethostname(),
'username': getpass.getuser(),
'python_version': platform.python_version(),
'wx_version': wx.VERSION_STRING,
'platform': platform.platform()
}
except Exception as e:
logger.error(f"Error gathering system info: {e}")
return {
'os': "Unknown",
'architecture': "Unknown",
'processor': "Unknown",
'physical_cores': "N/A",
'total_cores': "N/A",
'max_frequency': "N/A",
'total_ram': "N/A",
'available_ram': "N/A",
'hostname': "Unknown",
'username': "Unknown",
'python_version': platform.python_version(),
'wx_version': wx.VERSION_STRING,
'platform': "Unknown"
}
def detect_processor(self):
"""Improved processor detection for Ryzen and other CPUs"""
try:
# Method 1: Try platform.processor() first
processor = platform.processor()
if processor and processor.strip() and processor != "unknown":
return processor.strip()
# Method 2: Try CPU info file on Linux
if platform.system() == "Linux":
try:
with open('/proc/cpuinfo', 'r') as f:
cpuinfo = f.read()
# Look for model name
for line in cpuinfo.split('\n'):
if line.startswith('model name') or line.startswith('Model Name') or line.startswith('Processor'):
parts = line.split(':', 1)
if len(parts) > 1:
cpu_name = parts[1].strip()
if cpu_name:
return cpu_name
# Look for hardware field
for line in cpuinfo.split('\n'):
if line.startswith('Hardware'):
parts = line.split(':', 1)
if len(parts) > 1:
cpu_name = parts[1].strip()
if cpu_name:
return cpu_name
except Exception as e:
logger.debug(f"Error reading /proc/cpuinfo: {e}")
# Method 3: Try lscpu command
try:
result = subprocess.run(['lscpu'], capture_output=True, text=True, timeout=5)
if result.returncode == 0:
for line in result.stdout.split('\n'):
if 'Model name:' in line:
parts = line.split(':', 1)
if len(parts) > 1:
cpu_name = parts[1].strip()
if cpu_name:
return cpu_name
except Exception as e:
logger.debug(f"Error running lscpu: {e}")
# Method 4: Try sysctl on macOS
if platform.system() == "Darwin":
try:
result = subprocess.run(['sysctl', '-n', 'machdep.cpu.brand_string'],
capture_output=True, text=True, timeout=5)
if result.returncode == 0:
cpu_name = result.stdout.strip()
if cpu_name:
return cpu_name
except Exception as e:
logger.debug(f"Error running sysctl: {e}")
# Method 5: Try WMI on Windows
if platform.system() == "Windows":
try:
import ctypes
from ctypes import wintypes
# Try kernel32 GetNativeSystemInfo
kernel32 = ctypes.windll.kernel32
class SYSTEM_INFO(ctypes.Structure):
_fields_ = [
("wProcessorArchitecture", wintypes.WORD),
("wReserved", wintypes.WORD),
("dwPageSize", wintypes.DWORD),
("lpMinimumApplicationAddress", ctypes.c_void_p),
("lpMaximumApplicationAddress", ctypes.c_void_p),
("dwActiveProcessorMask", ctypes.c_void_p),
("dwNumberOfProcessors", wintypes.DWORD),
("dwProcessorType", wintypes.DWORD),
("dwAllocationGranularity", wintypes.DWORD),
("wProcessorLevel", wintypes.WORD),
("wProcessorRevision", wintypes.WORD)
]
system_info = SYSTEM_INFO()
kernel32.GetNativeSystemInfo(ctypes.byref(system_info))
# Map processor type to name
processor_types = {
586: "Pentium",
8664: "x64",
}
if system_info.dwProcessorType in processor_types:
return processor_types[system_info.dwProcessorType]
except Exception as e:
logger.debug(f"Error with Windows processor detection: {e}")
# Final fallback
return f"Unknown ({platform.machine()})"
except Exception as e:
logger.error(f"Error in processor detection: {e}")
return "Unknown"
def get_security_warnings(self, system_info):
"""Generate security warnings based on system capabilities"""
warnings = []
try:
# Check for Intel ME / AMD PSP indicators
processor_lower = system_info['processor'].lower()
if 'intel' in processor_lower:
warnings.append("Intel Processor detected: Management Engine (ME) present")
if 'core' in processor_lower and any(gen in processor_lower for gen in ['i3', 'i5', 'i7', 'i9']):
warnings.append("Modern Intel Core processor: ME capabilities active")
elif 'amd' in processor_lower:
warnings.append("AMD Processor detected: Platform Security Processor (PSP) present")
if 'ryzen' in processor_lower:
warnings.append("Modern AMD Ryzen processor: PSP capabilities active")
if '5600x' in processor_lower:
warnings.append("AMD Ryzen 5 5600X: Zen 3 architecture with PSP")
# Check for specific AMD models
if '5600x' in processor_lower:
warnings.append("AMD Ryzen 5 5600X detected: Hardware level security processor active")
# Check RAM size
if system_info['total_ram'] != "N/A" and system_info['total_ram'] > 8:
warnings.append("System has substantial RAM capacity for background processes")
# Check core count
if system_info['total_cores'] != "N/A" and system_info['total_cores'] >= 4:
warnings.append("Multi core system capable of parallel background processing")
# Network capabilities
warnings.append("System has network connectivity capabilities")
warnings.append("WiFi/Ethernet hardware present for data transmission")
# Operating system specific
os_lower = system_info['os'].lower()
if 'windows' in os_lower:
warnings.append("Windows OS: Telemetry and background services active")
elif 'linux' in os_lower or 'arch' in os_lower:
warnings.append("Linux OS: Generally more transparent, but hardware level components still active")
if 'arch' in os_lower:
warnings.append("Arch Linux: Rolling release, ensure regular security updates")
elif 'mac' in os_lower:
warnings.append("macOS: Apple T2/T1 security chip or Apple Silicon present")
except Exception as e:
logger.error(f"Error generating security warnings: {e}")
warnings = ["Unable to fully assess system security features"]
return "\n".join(warnings)
def adjust_to_screen(self):
"""Adjust dialog size and position to fit within screen bounds"""
screen_width, screen_height = wx.DisplaySize()
current_size = self.GetSize()
final_width = min(current_size.width, screen_width - 40)
final_height = min(current_size.height, screen_height - 40)
self.SetSize(final_width, final_height)
self.body_text.Wrap(final_width - 60)
self.Layout()
self.scrolled_win.FitInside()
self.CentreOnParent()
def on_resize(self, event):
"""Handle dialog resize to re-wrap text appropriately"""
try:
if hasattr(self, 'body_text'):
new_width = self.GetSize().width - 60
if new_width > 200:
self.body_text.Wrap(new_width)
self.scrolled_win.Layout()
self.scrolled_win.FitInside()
except Exception as e:
logger.error(f"Error in resize handler: {e}")
event.Skip()
def on_ok(self, event):
self.EndModal(wx.ID_OK)
class UIUpdate:
"""Thread-safe UI update container"""
def __init__(self, callback, *args, **kwargs):
self.callback = callback
self.args = args
self.kwargs = kwargs
class SearchDialog(wx.Dialog):
def __init__(self, parent):
super().__init__(parent, title="Search", style=wx.DEFAULT_DIALOG_STYLE)
self.parent = parent
sizer = wx.BoxSizer(wx.VERTICAL)
# Search input
search_sizer = wx.BoxSizer(wx.HORIZONTAL)
search_sizer.Add(wx.StaticText(self, label="Search:"), 0, wx.ALIGN_CENTER_VERTICAL | wx.ALL, 5)
self.search_ctrl = wx.TextCtrl(self, size=(200, -1), style=wx.TE_PROCESS_ENTER)
self.search_ctrl.Bind(wx.EVT_TEXT_ENTER, self.on_search)
search_sizer.Add(self.search_ctrl, 1, wx.EXPAND | wx.ALL, 5)
sizer.Add(search_sizer, 0, wx.EXPAND)
# Options
options_sizer = wx.BoxSizer(wx.HORIZONTAL)
self.case_sensitive = wx.CheckBox(self, label="Case sensitive")
self.whole_word = wx.CheckBox(self, label="Whole word")
options_sizer.Add(self.case_sensitive, 0, wx.ALL, 5)
options_sizer.Add(self.whole_word, 0, wx.ALL, 5)
sizer.Add(options_sizer, 0, wx.EXPAND)
# Buttons
btn_sizer = wx.StdDialogButtonSizer()
self.search_btn = wx.Button(self, wx.ID_OK, "Search")
self.search_btn.SetDefault()
self.search_btn.Bind(wx.EVT_BUTTON, self.on_search)
btn_sizer.AddButton(self.search_btn)
close_btn = wx.Button(self, wx.ID_CANCEL, "Close")
btn_sizer.AddButton(close_btn)
btn_sizer.Realize()
sizer.Add(btn_sizer, 0, wx.ALIGN_CENTER | wx.ALL, 10)
self.SetSizer(sizer)
self.Fit()
self.search_ctrl.SetFocus()
def on_search(self, event):
search_text = self.search_ctrl.GetValue().strip()
if search_text:
self.parent.perform_search(
search_text,
self.case_sensitive.IsChecked(),
self.whole_word.IsChecked()
)
class AboutDialog(wx.Dialog):
def __init__(self, parent):
super().__init__(parent, title="About wxIRC Client", style=wx.DEFAULT_DIALOG_STYLE | wx.RESIZE_BORDER)
sizer = wx.BoxSizer(wx.VERTICAL)
# Application info
info_text = wx.StaticText(self, label="wxIRC Client")
info_font = wx.Font(14, wx.FONTFAMILY_DEFAULT, wx.FONTSTYLE_NORMAL, wx.FONTWEIGHT_BOLD)
info_text.SetFont(info_font)
version_text = wx.StaticText(self, label=f"V 1.1.1.0")
version_font = wx.Font(10, wx.FONTFAMILY_DEFAULT, wx.FONTSTYLE_NORMAL, wx.FONTWEIGHT_NORMAL)
version_text.SetFont(version_font)
features_font = wx.Font(10, wx.FONTFAMILY_DEFAULT, wx.FONTSTYLE_NORMAL, wx.FONTWEIGHT_BOLD)
# Add to sizer
sizer.Add(info_text, 0, wx.ALL | wx.ALIGN_CENTER, 10)
sizer.Add(version_text, 0, wx.ALL | wx.ALIGN_CENTER, 5)
# OK button
ok_btn = wx.Button(self, wx.ID_OK, "OK")
ok_btn.SetDefault()
btn_sizer = wx.BoxSizer(wx.HORIZONTAL)
btn_sizer.Add(ok_btn, 0, wx.ALIGN_CENTER | wx.ALL, 10)
sizer.Add(btn_sizer, 0, wx.ALIGN_CENTER)
self.SetSizer(sizer)
self.Fit()
self.Centre()
class IRCPanel(wx.Panel):
def __init__(self, parent, main_frame):
super().__init__(parent)
self.parent = parent
self.main_frame = main_frame
self.messages = []
sizer = wx.BoxSizer(wx.VERTICAL)
# Use a better font for chat
self.text_ctrl = wx.TextCtrl(self, style=wx.TE_MULTILINE | wx.TE_READONLY | wx.TE_RICH2 | wx.TE_AUTO_URL)
self.text_ctrl.SetBackgroundColour(wx.Colour(30, 30, 30))
self.text_ctrl.SetForegroundColour(wx.Colour(220, 220, 220))
# Load Fira Code font
self.font = self.load_fira_code_font()
self.text_ctrl.SetFont(self.font)
sizer.Add(self.text_ctrl, 1, wx.EXPAND | wx.ALL, 0)
input_sizer = wx.BoxSizer(wx.HORIZONTAL)
self.input_ctrl = wx.TextCtrl(self, style=wx.TE_PROCESS_ENTER)
self.input_ctrl.SetHint("Type message here …")
self.input_ctrl.Bind(wx.EVT_TEXT_ENTER, self.on_send)
self.input_ctrl.Bind(wx.EVT_KEY_DOWN, self.on_key_down)
send_btn = wx.Button(self, label="Send")
send_btn.SetToolTip("Send message (Enter)")
send_btn.Bind(wx.EVT_BUTTON, self.on_send)
input_sizer.Add(self.input_ctrl, 1, wx.EXPAND | wx.ALL, 2)
input_sizer.Add(send_btn, 0, wx.ALL, 2)
sizer.Add(input_sizer, 0, wx.EXPAND | wx.ALL, 0)
self.SetSizer(sizer)
self.target = None
self.history = []
self.history_pos = -1
# Search state
self.search_text = ""
self.search_positions = []
self.current_search_index = -1
# Bind Ctrl+F for search
self.text_ctrl.Bind(wx.EVT_KEY_DOWN, self.on_text_key_down)
accel_tbl = wx.AcceleratorTable([(wx.ACCEL_CTRL, ord('F'), wx.ID_FIND)])
self.SetAcceleratorTable(accel_tbl)
self.Bind(wx.EVT_MENU, self.on_search, id=wx.ID_FIND)
def load_fira_code_font(self):
"""Load Fira Code font from resources"""
try:
# Try to use Fira Code if available
font_path = get_resource_path("FiraCode-Regular.ttf")
if os.path.exists(font_path):
# On wxPython 4.1+, we can try to add the font to the font manager
try:
font_collection = wx.private.FontCollection()
if font_collection.AddFont(font_path):
font = wx.Font(10, wx.FONTFAMILY_TELETYPE, wx.FONTSTYLE_NORMAL, wx.FONTWEIGHT_NORMAL,
False, "Fira Code")
if font.IsOk():
logger.info("Successfully loaded Fira Code font")
return font
except Exception:
pass
# Fallback: try to create font by name
font = wx.Font(wx.FontInfo(10).Family(wx.FONTFAMILY_TELETYPE).FaceName("Fira Code"))
if font.IsOk():
logger.info("Using Fira Code font via FaceName")
return font
else:
logger.warning("FiraCode-Regular.ttf not found in resources")
except Exception as e:
logger.error(f"Error loading Fira Code font: {e}")
# Fall back to system monospace font
font = wx.Font(10, wx.FONTFAMILY_TELETYPE, wx.FONTSTYLE_NORMAL, wx.FONTWEIGHT_NORMAL)
logger.info("Using system monospace font as fallback")
return font
def set_target(self, target):
self.target = target
def add_message(self, message, username=None, username_color=None, message_color=None, bold=False, italic=False, underline=False):
"""Thread-safe message addition with username coloring"""
try:
# Use CallAfter for thread safety
if wx.IsMainThread():
self._add_message_safe(message, username, username_color, message_color, bold, italic, underline)
else:
wx.CallAfter(self._add_message_safe, message, username, username_color, message_color, bold, italic, underline)
except Exception as e:
logger.error(f"Error in add_message: {e}")
def _add_message_safe(self, message, username=None, username_color=None, message_color=None, bold=False, italic=False, underline=False):
"""Actually add message - must be called from main thread"""
try:
self.messages.append(message)
# Save current position for formatting
start_pos = self.text_ctrl.GetLastPosition()
if username and username_color:
# Add username with its color
attr = wx.TextAttr()
attr.SetTextColour(username_color)
if bold:
attr.SetFontWeight(wx.FONTWEIGHT_BOLD)
if italic:
attr.SetFontStyle(wx.FONTSTYLE_ITALIC)
if underline:
attr.SetFontUnderlined(True)
attr.SetFont(self.font)
self.text_ctrl.SetDefaultStyle(attr)
self.text_ctrl.AppendText(username)
# Add the rest of the message with message color
attr = wx.TextAttr()
if message_color:
attr.SetTextColour(message_color)
else:
attr.SetTextColour(wx.Colour(220, 220, 220))
attr.SetFont(self.font)
self.text_ctrl.SetDefaultStyle(attr)
# Append the message (without username if we already added it)
if username and username_color:
# Find the message part after username
message_text = message[message.find(username) + len(username):]
self.text_ctrl.AppendText(message_text + "\n")
else:
self.text_ctrl.AppendText(message + "\n")
# Auto-scroll to bottom
self.text_ctrl.ShowPosition(self.text_ctrl.GetLastPosition())
except Exception as e:
logger.error(f"Error adding message safely: {e}")
def add_formatted_message(self, timestamp, username, content, username_color=None, is_action=False):
"""Add a formatted message with colored username"""
try:
if is_action:
message = f"{timestamp}* {username} {content}"
self.add_message(message, f"* {username}", username_color, wx.Colour(255, 150, 255), italic=True)
else:
message = f"{timestamp}<{username}> {content}"
self.add_message(message, f"<{username}>", username_color, wx.Colour(220, 220, 220))
except Exception as e:
logger.error(f"Error adding formatted message: {e}")
def add_system_message(self, message, color=None, bold=False):
"""Add system message without username coloring"""
try:
if color is None:
color = wx.Colour(180, 180, 255)
self.add_message(message, None, None, color, bold, False, False)
except Exception as e:
logger.error(f"Error adding system message: {e}")
def on_text_key_down(self, event):
"""Handle key events in the text control"""
keycode = event.GetKeyCode()
if event.ControlDown() and keycode == ord('F'):
self.on_search(event)
else:
event.Skip()
def on_search(self, event):
"""Open search dialog"""
try:
dlg = SearchDialog(self)
dlg.ShowModal()
dlg.Destroy()
except Exception as e:
logger.error(f"Error in search: {e}")
def perform_search(self, search_text, case_sensitive=False, whole_word=False):
"""Perform text search in the chat history"""
try:
self.search_text = search_text
self.search_positions = []
self.current_search_index = -1
# Get all text
full_text = self.text_ctrl.GetValue()
if not full_text or not search_text:
return
# Prepare search parameters
search_flags = 0
if not case_sensitive:
# For manual search, we'll handle case sensitivity ourselves
search_text_lower = search_text.lower()
full_text_lower = full_text.lower()
# Manual search implementation since wx.TextCtrl doesn't have FindText
pos = 0
while pos < len(full_text):
if case_sensitive:
# Case sensitive search
found_pos = full_text.find(search_text, pos)
else:
# Case insensitive search
found_pos = full_text_lower.find(search_text_lower, pos)
if found_pos == -1:
break
# For whole word search, verify boundaries
if whole_word:
# Check if it's a whole word
is_word_start = (found_pos == 0 or not full_text[found_pos-1].isalnum())
is_word_end = (found_pos + len(search_text) >= len(full_text) or
not full_text[found_pos + len(search_text)].isalnum())
if is_word_start and is_word_end:
self.search_positions.append(found_pos)
pos = found_pos + 1 # Move forward to avoid infinite loop
else:
self.search_positions.append(found_pos)
pos = found_pos + len(search_text)
if self.search_positions:
self.current_search_index = 0
self.highlight_search_result()
self.main_frame.SetStatusText(f"Found {len(self.search_positions)} occurrences of '{search_text}'")
else:
self.main_frame.SetStatusText(f"Text '{search_text}' not found")
wx.Bell()
except Exception as e:
logger.error(f"Error performing search: {e}")
traceback.print_exc()
def highlight_search_result(self):
"""Highlight the current search result"""
try:
if not self.search_positions or self.current_search_index < 0:
return
pos = self.search_positions[self.current_search_index]
# Select the found text
self.text_ctrl.SetSelection(pos, pos + len(self.search_text))
self.text_ctrl.ShowPosition(pos)
# Update status
self.main_frame.SetStatusText(
f"Found {self.current_search_index + 1} of {len(self.search_positions)}: '{self.search_text}'"
)
except Exception as e:
logger.error(f"Error highlighting search result: {e}")
def find_next(self):
"""Find next occurrence"""
if self.search_positions:
self.current_search_index = (self.current_search_index + 1) % len(self.search_positions)
self.highlight_search_result()
def find_previous(self):
"""Find previous occurrence"""
if self.search_positions:
self.current_search_index = (self.current_search_index - 1) % len(self.search_positions)
self.highlight_search_result()
def on_key_down(self, event):
try:
keycode = event.GetKeyCode()
if keycode == wx.WXK_UP:
if self.history and self.history_pos < len(self.history) - 1:
self.history_pos += 1
self.input_ctrl.SetValue(self.history[-(self.history_pos + 1)])
elif keycode == wx.WXK_DOWN:
if self.history_pos > 0:
self.history_pos -= 1
self.input_ctrl.SetValue(self.history[-(self.history_pos + 1)])
elif self.history_pos == 0:
self.history_pos = -1
self.input_ctrl.Clear()
elif keycode == wx.WXK_TAB:
# Tab completion for nicknames
self.handle_tab_completion()
return # Don't skip to prevent default tab behavior
elif keycode == wx.WXK_F3:
# F3 for find next
if self.search_positions:
self.find_next()
return
elif event.ShiftDown() and keycode == wx.WXK_F3:
# Shift+F3 for find previous
if self.search_positions:
self.find_previous()
return
else:
event.Skip()
except Exception as e:
logger.error(f"Error in key handler: {e}")
event.Skip()
def handle_tab_completion(self):
"""Handle tab completion for nicknames"""
try:
current_text = self.input_ctrl.GetValue()
if not current_text or not self.target or not self.target.startswith('#'):
return
users = self.main_frame.channel_users.get(self.target, [])
if not users:
return
# Find word at cursor position
pos = self.input_ctrl.GetInsertionPoint()
text_before = current_text[:pos]
words = text_before.split()
if not words:
return
current_word = words[-1]
# Find matching nicks
matches = [user for user in users if user.lower().startswith(current_word.lower())]
if matches:
if len(matches) == 1:
# Single match - complete it
new_word = matches[0]
if ':' in text_before or text_before.strip().endswith(current_word):
# Replace the current word
new_text = text_before[:-len(current_word)] + new_word + current_text[pos:]
self.input_ctrl.SetValue(new_text)
self.input_ctrl.SetInsertionPoint(pos - len(current_word) + len(new_word))
else:
# Multiple matches - show in status
self.main_frame.SetStatusText(f"Tab completion: {', '.join(matches[:5])}{'...' if len(matches) > 5 else ''}")
except Exception as e:
logger.error(f"Error in tab completion: {e}")
def on_send(self, event):
try:
message = self.input_ctrl.GetValue().strip()
if message and self.target:
self.history.append(message)
self.history_pos = -1
self.main_frame.send_message(self.target, message)
self.input_ctrl.Clear()
except Exception as e:
logger.error(f"Error sending message: {e}")
class IRCFrame(wx.Frame):
def __init__(self):
super().__init__(None, title="wxIRC", size=(1200, 700))
# Show privacy notice first
self.show_privacy_notice()
self.reactor = None
self.connection = None
self.reactor_thread = None
self.connection_lock = threading.RLock()
self.is_connecting = False
self.is_disconnecting = False
self.ui_update_queue = queue.Queue()
self.ui_timer = None
self.channels = {}
self.channel_users = defaultdict(list)
self.current_channel = None
self.nick = ""
self.server = ""
self.port = 6667
self.highlights = []
self.auto_join_channels = []
self.away = False
self.timestamps = True
# User color mapping
self.user_colors = {}
self.available_colors = [
wx.Colour(255, 150, 150), # Light red
wx.Colour(150, 255, 150), # Light green
wx.Colour(150, 200, 255), # Light blue
wx.Colour(255, 255, 150), # Light yellow
wx.Colour(255, 150, 255), # Light magenta
wx.Colour(150, 255, 255), # Light cyan
wx.Colour(255, 200, 150), # Light orange
wx.Colour(200, 150, 255), # Light purple
wx.Colour(255, 200, 200), # Pink
wx.Colour(200, 255, 200), # Mint
wx.Colour(200, 200, 255), # Lavender
wx.Colour(255, 255, 200), # Cream
]
self.color_index = 0
self.setup_irc_handlers()
self.create_menubar()
self.setup_ui()
# Start UI update timer
self.ui_timer = wx.Timer(self)
self.Bind(wx.EVT_TIMER, self.process_ui_updates, self.ui_timer)
self.ui_timer.Start(50) # Process UI updates every 50ms
self.CreateStatusBar()
self.SetStatusText("Not connected")
self.Centre()
self.Show()
self.Bind(wx.EVT_CLOSE, self.on_close)
# Bind global accelerators for search and quick escape
accel_tbl = wx.AcceleratorTable([
(wx.ACCEL_CTRL, ord('F'), wx.ID_FIND),
(wx.ACCEL_NORMAL, wx.WXK_F3, 1001),
(wx.ACCEL_SHIFT, wx.WXK_F3, 1002),
(wx.ACCEL_CTRL, wx.WXK_ESCAPE, 1003), # Quick Escape
])
self.SetAcceleratorTable(accel_tbl)
self.Bind(wx.EVT_MENU, self.on_global_search, id=wx.ID_FIND)
self.Bind(wx.EVT_MENU, self.on_find_next, id=1001)
self.Bind(wx.EVT_MENU, self.on_find_previous, id=1002)
self.Bind(wx.EVT_MENU, self.on_quick_escape, id=1003)
def show_privacy_notice(self):
"""Show privacy notice dialog at startup"""
dlg = PrivacyNoticeDialog(self)
dlg.ShowModal()
dlg.Destroy()
def get_user_color(self, username):
"""Get a consistent color for a username"""
if username not in self.user_colors:
self.user_colors[username] = self.available_colors[self.color_index]
self.color_index = (self.color_index + 1) % len(self.available_colors)
return self.user_colors[username]
def on_global_search(self, event):
"""Handle global Ctrl+F search"""
current_panel = self.get_current_panel()
if current_panel:
current_panel.on_search(event)
def on_find_next(self, event):
"""Handle F3 for find next"""
current_panel = self.get_current_panel()
if current_panel and hasattr(current_panel, 'find_next'):
current_panel.find_next()
def on_find_previous(self, event):
"""Handle Shift+F3 for find previous"""
current_panel = self.get_current_panel()
if current_panel and hasattr(current_panel, 'find_previous'):
current_panel.find_previous()
def on_quick_escape(self, event):
"""Handle Ctrl+Esc for quick escape - exit immediately"""
try:
# Stop UI timer first
if self.ui_timer and self.ui_timer.IsRunning():
self.ui_timer.Stop()
# Force disconnect without sending quit message
with self.connection_lock:
if self.connection and self.connection.is_connected():
try:
self.connection.close()
except:
pass
# Exit immediately
wx.CallAfter(self.Destroy)
except Exception as e:
logger.error(f"Error in quick escape: {e}")
wx.CallAfter(self.Destroy)
def get_current_panel(self):
"""Get the currently active IRC panel"""
try:
current_page = self.notebook.GetCurrentPage()
if isinstance(current_page, IRCPanel):
return current_page
except:
pass
return None
def setup_ui(self):
"""Setup UI components"""
panel = wx.Panel(self)
main_sizer = wx.BoxSizer(wx.HORIZONTAL)
# Left sidebar
left_panel = wx.Panel(panel)
left_panel.SetBackgroundColour(wx.Colour(45, 45, 45))
left_sizer = wx.BoxSizer(wx.VERTICAL)
conn_box = wx.StaticBox(left_panel, label="Connection")
conn_box.SetToolTip("Configure IRC server connection")
conn_box_sizer = wx.StaticBoxSizer(conn_box, wx.VERTICAL)
conn_box_sizer.Add(wx.StaticText(conn_box, label="Server:"), 0, wx.ALL, 2)
self.server_ctrl = wx.TextCtrl(conn_box, value="irc.libera.chat")
self.server_ctrl.SetToolTip("IRC server address")
conn_box_sizer.Add(self.server_ctrl, 0, wx.EXPAND | wx.ALL, 2)
conn_box_sizer.Add(wx.StaticText(conn_box, label="Port:"), 0, wx.ALL, 2)
self.port_ctrl = wx.TextCtrl(conn_box, value="6667")
self.port_ctrl.SetToolTip("IRC server port (usually 6667)")
conn_box_sizer.Add(self.port_ctrl, 0, wx.EXPAND | wx.ALL, 2)
conn_box_sizer.Add(wx.StaticText(conn_box, label="Nick:"), 0, wx.ALL, 2)
self.nick_ctrl = wx.TextCtrl(conn_box, value="wxircuser")
self.nick_ctrl.SetToolTip("Your nickname on IRC")
conn_box_sizer.Add(self.nick_ctrl, 0, wx.EXPAND | wx.ALL, 2)
self.connect_btn = wx.Button(conn_box, label="Connect")
self.connect_btn.SetToolTip("Connect to IRC server")
self.connect_btn.Bind(wx.EVT_BUTTON, self.on_connect)
conn_box_sizer.Add(self.connect_btn, 0, wx.EXPAND | wx.ALL, 5)
left_sizer.Add(conn_box_sizer, 0, wx.EXPAND | wx.ALL, 5)
# Channel management
chan_box = wx.StaticBox(left_panel, label="Channels")
chan_box.SetToolTip("Join and manage channels")
chan_box_sizer = wx.StaticBoxSizer(chan_box, wx.VERTICAL)
join_sizer = wx.BoxSizer(wx.HORIZONTAL)
self.channel_input = wx.TextCtrl(chan_box, style=wx.TE_PROCESS_ENTER)
self.channel_input.SetHint("Enter channel name …")
self.channel_input.SetToolTip("Type channel name …")
self.channel_input.Bind(wx.EVT_TEXT_ENTER, self.on_join_channel)
join_btn = wx.Button(chan_box, label="Join")
join_btn.SetToolTip("Join channel")
join_btn.Bind(wx.EVT_BUTTON, self.on_join_channel)
join_sizer.Add(self.channel_input, 1, wx.EXPAND | wx.ALL, 2)
join_sizer.Add(join_btn, 0, wx.ALL, 2)
self.channel_list = wx.ListBox(chan_box)
self.channel_list.SetToolTip("Joined channels")
self.channel_list.Bind(wx.EVT_LISTBOX, self.on_channel_select)
self.channel_list.Bind(wx.EVT_RIGHT_DOWN, self.on_channel_right_click)
chan_box_sizer.Add(join_sizer, 0, wx.EXPAND | wx.ALL, 2)
chan_box_sizer.Add(self.channel_list, 1, wx.EXPAND | wx.ALL, 2)
left_sizer.Add(chan_box_sizer, 1, wx.EXPAND | wx.ALL, 5)
left_panel.SetSizer(left_sizer)
# Center - Notebook
self.notebook = wx.Notebook(panel)
# Server panel
server_panel = IRCPanel(self.notebook, self)
server_panel.set_target("SERVER")
self.notebook.AddPage(server_panel, "Server")
self.channels["SERVER"] = server_panel
# Right sidebar - Users
right_panel = wx.Panel(panel)
right_panel.SetBackgroundColour(wx.Colour(45, 45, 45))
right_sizer = wx.BoxSizer(wx.VERTICAL)
users_box = wx.StaticBox(right_panel, label="Users")
users_box_sizer = wx.StaticBoxSizer(users_box, wx.VERTICAL)
self.users_list = wx.ListBox(users_box)
self.users_list.Bind(wx.EVT_LISTBOX_DCLICK, self.on_user_dclick)
self.users_list.Bind(wx.EVT_RIGHT_DOWN, self.on_user_right_click)
users_box_sizer.Add(self.users_list, 1, wx.EXPAND | wx.ALL, 2)
right_sizer.Add(users_box_sizer, 1, wx.EXPAND | wx.ALL, 5)
right_panel.SetSizer(right_sizer)
# Add to main sizer
main_sizer.Add(left_panel, 0, wx.EXPAND | wx.ALL, 0)
main_sizer.Add(self.notebook, 1, wx.EXPAND | wx.ALL, 0)
main_sizer.Add(right_panel, 0, wx.EXPAND | wx.ALL, 0)
left_panel.SetMinSize((220, -1))
right_panel.SetMinSize((180, -1))
panel.SetSizer(main_sizer)
def process_ui_updates(self, event):
"""Process queued UI updates from timer event"""
try:
while True:
try:
update = self.ui_update_queue.get_nowait()
if update and update.callback:
update.callback(*update.args, **update.kwargs)
except queue.Empty:
break
except Exception as e:
logger.error(f"Error processing UI updates: {e}")
def safe_ui_update(self, callback, *args, **kwargs):
"""Safely update UI from any thread"""
try:
if wx.IsMainThread():
callback(*args, **kwargs)
else:
self.ui_update_queue.put(UIUpdate(callback, *args, **kwargs))
except Exception as e:
logger.error(f"Error in safe_ui_update: {e}")
def create_menubar(self):
try:
menubar = wx.MenuBar()
# File menu
file_menu = wx.Menu()
file_menu.Append(300, "&About", "About wxIRC Client")
file_menu.AppendSeparator()
file_menu.Append(wx.ID_EXIT, "E&xit\tCtrl+Q")
self.Bind(wx.EVT_MENU, self.on_about, id=300)
self.Bind(wx.EVT_MENU, self.on_close, id=wx.ID_EXIT)
# Edit menu with search
edit_menu = wx.Menu()
edit_menu.Append(wx.ID_FIND, "&Find\tCtrl+F")
self.Bind(wx.EVT_MENU, self.on_global_search, id=wx.ID_FIND)
edit_menu.Append(1001, "Find &Next\tF3")
edit_menu.Append(1002, "Find &Previous\tShift+F3")
self.Bind(wx.EVT_MENU, self.on_find_next, id=1001)
self.Bind(wx.EVT_MENU, self.on_find_previous, id=1002)
# Channel menu
channel_menu = wx.Menu()
channel_menu.Append(101, "&Join Channel\tCtrl+J")
channel_menu.Append(102, "&Part Channel\tCtrl+P")
channel_menu.AppendSeparator()
channel_menu.Append(103, "Close &Tab\tCtrl+W")
self.Bind(wx.EVT_MENU, self.on_menu_join, id=101)
self.Bind(wx.EVT_MENU, self.on_menu_part, id=102)
self.Bind(wx.EVT_MENU, self.on_menu_close_tab, id=103)
# Tools menu
tools_menu = wx.Menu()
tools_menu.Append(201, "&WHOIS User\tCtrl+I")
tools_menu.Append(202, "Change &Nick\tCtrl+N")
tools_menu.AppendSeparator()
self.away_item = tools_menu.AppendCheckItem(203, "&Away\tCtrl+A")
self.timestamp_item = tools_menu.AppendCheckItem(204, "Show &Timestamps")
self.timestamp_item.Check(True)
tools_menu.AppendSeparator()
tools_menu.Append(205, "Set &Highlights")
tools_menu.Append(206, "Auto-join Channels")
tools_menu.Append(207, "Command Help")
self.Bind(wx.EVT_MENU, self.on_menu_whois, id=201)
self.Bind(wx.EVT_MENU, self.on_menu_change_nick, id=202)
self.Bind(wx.EVT_MENU, self.on_menu_away, id=203)
self.Bind(wx.EVT_MENU, self.on_menu_timestamps, id=204)
self.Bind(wx.EVT_MENU, self.on_menu_highlights, id=205)
self.Bind(wx.EVT_MENU, self.on_menu_autojoin, id=206)
self.Bind(wx.EVT_MENU, self.on_menu_help, id=207)
menubar.Append(file_menu, "&File")
menubar.Append(edit_menu, "&Edit")
menubar.Append(channel_menu, "&Channel")
menubar.Append(tools_menu, "&Tools")
self.SetMenuBar(menubar)
except Exception as e:
logger.error(f"Error creating menu: {e}")
def on_about(self, event):
"""Show About dialog"""
try:
dlg = AboutDialog(self)
dlg.ShowModal()
dlg.Destroy()
except Exception as e:
logger.error(f"Error showing about dialog: {e}")
def setup_irc_handlers(self):
try:
self.reactor = irc.client.Reactor()
self.reactor.add_global_handler("all_events", self.on_all_events) # Catch all events
self.reactor.add_global_handler("welcome", self.on_welcome)
self.reactor.add_global_handler("join", self.on_join)
self.reactor.add_global_handler("part", self.on_part)
self.reactor.add_global_handler("quit", self.on_quit)
self.reactor.add_global_handler("pubmsg", self.on_pubmsg)
self.reactor.add_global_handler("privmsg", self.on_privmsg)
self.reactor.add_global_handler("namreply", self.on_namreply)
self.reactor.add_global_handler("nick", self.on_nick)
self.reactor.add_global_handler("mode", self.on_mode)
self.reactor.add_global_handler("notice", self.on_notice)
self.reactor.add_global_handler("disconnect", self.on_disconnect)
self.reactor.add_global_handler("topic", self.on_topic)
self.reactor.add_global_handler("kick", self.on_kick)
self.reactor.add_global_handler("whoisuser", self.on_whoisuser)
self.reactor.add_global_handler("whoischannels", self.on_whoischannels)
self.reactor.add_global_handler("whoisserver", self.on_whoisserver)
except Exception as e:
logger.error(f"Error setting up IRC handlers: {e}")
def on_all_events(self, connection, event):
"""Catch-all handler to log ALL server events in the Server tab"""
try:
# Don't log certain very frequent events to avoid spam
if event.type in ("pubmsg", "privmsg", "action"):
return
# Format the raw event for display
event_info = f"RAW: {event.type.upper()}"
if hasattr(event, 'source') and event.source:
event_info += f" from {event.source}"
if hasattr(event, 'target') and event.target:
event_info += f" to {event.target}"
if hasattr(event, 'arguments') and event.arguments:
event_info += f" - {' '.join(event.arguments)}"
self.log_server(event_info, wx.Colour(180, 180, 255), italic=True)
except Exception as e:
logger.error(f"Error in all_events handler: {e}")
def get_timestamp(self):
try:
if self.timestamps:
return f"[{datetime.now().strftime('%H:%M:%S')}] "
return ""
except Exception as e:
logger.error(f"Error getting timestamp: {e}")
return ""
def is_connected(self):
"""Thread-safe connection check"""
with self.connection_lock:
return self.connection and self.connection.is_connected()
def on_connect(self, event):
try:
if not self.is_connected() and not self.is_connecting:
self.is_connecting = True
self.server = self.server_ctrl.GetValue()
try:
self.port = int(self.port_ctrl.GetValue())
except ValueError:
self.safe_ui_update(self.log_server, "Invalid port number", wx.Colour(255, 100, 100))
self.is_connecting = False
return
self.nick = self.nick_ctrl.GetValue()
if not self.server or not self.nick:
self.safe_ui_update(self.log_server, "Server and nick are required", wx.Colour(255, 100, 100))
self.is_connecting = False
return
self.safe_ui_update(self.connect_btn.Enable, False)
self.safe_ui_update(self.log_server, f"Connecting to {self.server}:{self.port} as {self.nick}...", wx.Colour(150, 150, 255))
def connect_thread():
try:
logger.info(f"Attempting connection to {self.server}:{self.port}")
self.connection = self.reactor.server().connect(
self.server, self.port, self.nick,
username=self.nick, ircname=self.nick,
connect_factory=irc.connection.Factory()
)
self.reactor_thread = threading.Thread(
target=self.safe_reactor_loop,
name="IRC-Reactor",
daemon=True
)
self.reactor_thread.start()
self.safe_ui_update(self.on_connect_success)
except irc.client.ServerConnectionError as e:
error_msg = f"Connection error: {e}"
logger.error(error_msg)
self.safe_ui_update(self.on_connect_failed, error_msg)
except socket.gaierror as e:
error_msg = f"DNS resolution failed: {e}"
logger.error(error_msg)
self.safe_ui_update(self.on_connect_failed, error_msg)
except Exception as e:
error_msg = f"Unexpected connection error: {e}"
logger.error(error_msg)
self.safe_ui_update(self.on_connect_failed, error_msg)
threading.Thread(target=connect_thread, name="IRC-Connect", daemon=True).start()
elif self.is_connected():
self.disconnect()
except Exception as e:
logger.error(f"Error in connect handler: {e}")
self.is_connecting = False
self.safe_ui_update(self.connect_btn.Enable, True)
def safe_reactor_loop(self):
"""Safely run the reactor loop with exception handling"""
try:
self.reactor.process_forever()
except Exception as e:
logger.error(f"Reactor loop error: {e}")
self.safe_ui_update(self.log_server, f"Connection error: {e}", wx.Colour(255, 100, 100))
self.safe_ui_update(self.on_disconnect_cleanup)
def on_connect_success(self):
"""Handle successful connection"""
self.is_connecting = False
self.connect_btn.SetLabel("Disconnect")
self.connect_btn.Enable(True)
self.SetStatusText(f"Connected to {self.server} - Use /help for commands, Ctrl+F to search, Ctrl+Esc to quick exit")
logger.info(f"Successfully connected to {self.server}")
def on_connect_failed(self, error_msg):
"""Handle connection failure"""
self.is_connecting = False
self.log_server(error_msg, wx.Colour(255, 100, 100))
self.connect_btn.Enable(True)
self.SetStatusText("Connection failed")
logger.error(f"Connection failed: {error_msg}")
def disconnect(self):
"""Safely disconnect from IRC server"""
try:
with self.connection_lock:
if self.connection and self.connection.is_connected():
self.is_disconnecting = True
self.connection.quit("Goodbye")
# Give it a moment to send the quit message
threading.Timer(1.0, self.force_disconnect).start()
else:
self.on_disconnect_cleanup()
except Exception as e:
logger.error(f"Error during disconnect: {e}")
self.on_disconnect_cleanup()
def force_disconnect(self):
"""Force disconnect if graceful quit fails"""
try:
with self.connection_lock:
if self.connection:
self.connection.close()
self.on_disconnect_cleanup()
except Exception as e:
logger.error(f"Error during force disconnect: {e}")
self.on_disconnect_cleanup()
def on_disconnect_cleanup(self):
"""Clean up after disconnect"""
with self.connection_lock:
self.connection = None
self.is_connecting = False
self.is_disconnecting = False
self.safe_ui_update(self.connect_btn.SetLabel, "Connect")
self.safe_ui_update(self.connect_btn.Enable, True)
self.safe_ui_update(self.SetStatusText, "Disconnected")
self.safe_ui_update(self.log_server, "Disconnected from server", wx.Colour(255, 100, 100))
def on_join_channel(self, event):
try:
channel = self.channel_input.GetValue().strip()
if channel and self.is_connected():
if not channel.startswith('#'):
channel = '#' + channel
self.connection.join(channel)
self.channel_input.Clear()
except Exception as e:
logger.error(f"Error joining channel: {e}")
self.safe_ui_update(self.log_server, f"Error joining channel: {e}", wx.Colour(255, 100, 100))
def on_channel_select(self, event):
try:
selection = self.channel_list.GetSelection()
if selection != wx.NOT_FOUND:
channel = self.channel_list.GetString(selection)
self.switch_to_channel(channel)
except Exception as e:
logger.error(f"Error selecting channel: {e}")
def on_channel_right_click(self, event):
try:
selection = self.channel_list.HitTest(event.GetPosition())
if selection != wx.NOT_FOUND:
channel = self.channel_list.GetString(selection)
menu = wx.Menu()
menu.Append(1, "Part Channel")
menu.Append(2, "Close Tab")
self.Bind(wx.EVT_MENU, lambda e: self.part_channel(channel), id=1)
self.Bind(wx.EVT_MENU, lambda e: self.close_channel(channel), id=2)
self.PopupMenu(menu)
menu.Destroy()
except Exception as e:
logger.error(f"Error in channel right click: {e}")
def on_user_dclick(self, event):
try:
selection = self.users_list.GetSelection()
if selection != wx.NOT_FOUND:
user = self.users_list.GetString(selection)
self.open_query(user)
except Exception as e:
logger.error(f"Error in user double click: {e}")
def on_user_right_click(self, event):
try:
selection = self.users_list.HitTest(event.GetPosition())
if selection != wx.NOT_FOUND:
user = self.users_list.GetString(selection)
menu = wx.Menu()
menu.Append(1, f"Query {user}")
menu.Append(2, f"WHOIS {user}")
if self.current_channel and self.current_channel.startswith('#'):
menu.AppendSeparator()
menu.Append(3, f"Kick {user}")
self.Bind(wx.EVT_MENU, lambda e: self.open_query(user), id=1)
self.Bind(wx.EVT_MENU, lambda e: self.whois_user(user), id=2)
if self.current_channel and self.current_channel.startswith('#'):
self.Bind(wx.EVT_MENU, lambda e: self.kick_user(user), id=3)
self.PopupMenu(menu)
menu.Destroy()
except Exception as e:
logger.error(f"Error in user right click: {e}")
def open_query(self, user):
try:
if user not in self.channels:
self.add_channel(user)
self.switch_to_channel(user)
except Exception as e:
logger.error(f"Error opening query: {e}")
def whois_user(self, user):
try:
if self.is_connected():
self.connection.whois([user])
except Exception as e:
logger.error(f"Error in WHOIS: {e}")
def kick_user(self, user):
try:
if self.current_channel and self.current_channel.startswith('#') and self.is_connected():
reason = wx.GetTextFromUser("Kick reason:", "Kick User", "Kicked")
if reason is not None:
self.connection.kick(self.current_channel, user, reason)
except Exception as e:
logger.error(f"Error kicking user: {e}")
def switch_to_channel(self, channel):
try:
self.current_channel = channel
for i in range(self.notebook.GetPageCount()):
if self.notebook.GetPageText(i) == channel:
self.notebook.SetSelection(i)
break
self.update_user_list(channel)
except Exception as e:
logger.error(f"Error switching channel: {e}")
def update_user_list(self, channel):
"""Thread-safe user list update"""
def _update_user_list():
try:
self.users_list.Clear()
if channel in self.channel_users:
for user in sorted(self.channel_users[channel]):
self.users_list.Append(user)
except Exception as e:
logger.error(f"Error updating user list: {e}")
self.safe_ui_update(_update_user_list)
def send_message(self, target, message):
try:
if message.startswith('/'):
self.handle_command(target, message)
elif target != "SERVER" and self.is_connected():
self.connection.privmsg(target, message)
if target in self.channels:
# Show our own message with our color
user_color = self.get_user_color(self.nick)
timestamp = self.get_timestamp()
self.channels[target].add_formatted_message(timestamp, self.nick, message, user_color)
except Exception as e:
logger.error(f"Error sending message: {e}")
self.safe_ui_update(self.log_server, f"Error sending message: {e}", wx.Colour(255, 100, 100))
def handle_command(self, target, message):
try:
parts = message[1:].split(' ', 1)
cmd = parts[0].lower()
args = parts[1] if len(parts) > 1 else ""
if cmd == "help":
help_text = """
Available commands:
/join <channel> - Join a channel
/part [channel] - Leave current or specified channel
/msg <nick> <message> - Send private message
/me <action> - Send action message
/nick <newnick> - Change nickname
/whois <nick> - Get user information
/topic [newtopic] - Get or set channel topic
/kick <user> [reason] - Kick user from channel
/away [message] - Set away status
/quit [message] - Disconnect from server
/help - Show this help
"""
self.log_server(help_text, wx.Colour(200, 255, 200))
elif cmd == "me":
if self.is_connected():
self.connection.action(target, args)
user_color = self.get_user_color(self.nick)
timestamp = self.get_timestamp()
self.channels[target].add_formatted_message(timestamp, self.nick, args, user_color, is_action=True)
elif cmd == "nick" and self.is_connected():
self.connection.nick(args)
elif cmd == "join" and self.is_connected():
if args and not args.startswith('#'):
args = '#' + args
self.connection.join(args)
elif cmd == "part" and self.is_connected():
channel = args if args else target
self.connection.part(channel)
elif cmd == "quit" and self.is_connected():
reason = args if args else "Goodbye"
self.connection.quit(reason)
elif cmd == "msg" and self.is_connected():
nick_msg = args.split(' ', 1)
if len(nick_msg) == 2:
self.connection.privmsg(nick_msg[0], nick_msg[1])
elif cmd == "whois" and self.is_connected():
self.connection.whois([args])
elif cmd == "kick" and self.is_connected():
kick_args = args.split(' ', 1)
user = kick_args[0]
reason = kick_args[1] if len(kick_args) > 1 else "Kicked"
self.connection.kick(target, user, reason)
elif cmd == "topic" and self.is_connected():
if args:
self.connection.topic(target, args)
else:
self.connection.topic(target)
elif cmd == "away" and self.is_connected():
self.connection.send_raw(f"AWAY :{args}" if args else "AWAY")
self.away = bool(args)
self.safe_ui_update(self.away_item.Check, self.away)
else:
self.safe_ui_update(self.log_server, f"Unknown command: {cmd}. Use /help for available commands.", wx.Colour(255, 100, 100))
except Exception as e:
logger.error(f"Error handling command: {e}")
self.safe_ui_update(self.log_server, f"Error executing command: {e}", wx.Colour(255, 100, 100))
def part_channel(self, channel):
try:
if self.is_connected():
self.connection.part(channel)
except Exception as e:
logger.error(f"Error parting channel: {e}")
def close_channel(self, channel):
try:
if channel in self.channels and channel != "SERVER":
def _close_channel():
for i in range(self.notebook.GetPageCount()):
if self.notebook.GetPageText(i) == channel:
self.notebook.DeletePage(i)
break
del self.channels[channel]
idx = self.channel_list.FindString(channel)
if idx != wx.NOT_FOUND:
self.channel_list.Delete(idx)
self.safe_ui_update(_close_channel)
except Exception as e:
logger.error(f"Error closing channel: {e}")
def log_server(self, message, color=None, bold=False, italic=False, underline=False):
try:
if "SERVER" in self.channels:
self.channels["SERVER"].add_system_message(f"{self.get_timestamp()}{message}", color, bold)
except Exception as e:
logger.error(f"Error logging server message: {e}")
def log_channel_message(self, channel, username, message, is_action=False, is_system=False):
"""Log a message to a channel with username coloring"""
try:
if channel not in self.channels:
self.safe_ui_update(self.add_channel, channel)
if channel in self.channels:
timestamp = self.get_timestamp()
if is_system:
# System messages (joins, parts, etc.)
user_color = self.get_user_color(username)
self.channels[channel].add_system_message(f"{timestamp}{message}", user_color)
elif is_action:
# Action messages (/me)
user_color = self.get_user_color(username)
self.channels[channel].add_formatted_message(timestamp, username, message, user_color, is_action=True)
else:
# Regular messages
user_color = self.get_user_color(username)
self.channels[channel].add_formatted_message(timestamp, username, message, user_color)
# Check for highlights
if self.highlights and any(h.lower() in message.lower() for h in self.highlights):
self.safe_ui_update(wx.Bell)
except Exception as e:
logger.error(f"Error logging channel message: {e}")
def add_channel(self, channel):
try:
if channel not in self.channels:
panel = IRCPanel(self.notebook, self)
panel.set_target(channel)
self.notebook.AddPage(panel, channel)
self.channels[channel] = panel
if channel.startswith('#'):
self.channel_list.Append(channel)
except Exception as e:
logger.error(f"Error adding channel: {e}")
# Menu handlers
def on_menu_join(self, event):
try:
dlg = wx.TextEntryDialog(self, "Enter channel name:", "Join Channel")
if dlg.ShowModal() == wx.ID_OK:
channel = dlg.GetValue()
if channel and self.is_connected():
if not channel.startswith('#'):
channel = '#' + channel
self.connection.join(channel)
dlg.Destroy()
except Exception as e:
logger.error(f"Error in menu join: {e}")
def on_menu_part(self, event):
try:
if self.current_channel and self.current_channel != "SERVER":
self.part_channel(self.current_channel)
except Exception as e:
logger.error(f"Error in menu part: {e}")
def on_menu_close_tab(self, event):
try:
if self.current_channel and self.current_channel != "SERVER":
self.close_channel(self.current_channel)
except Exception as e:
logger.error(f"Error in menu close tab: {e}")
def on_menu_whois(self, event):
try:
dlg = wx.TextEntryDialog(self, "Enter nickname:", "WHOIS")
if dlg.ShowModal() == wx.ID_OK:
user = dlg.GetValue()
if user and self.is_connected():
self.whois_user(user)
dlg.Destroy()
except Exception as e:
logger.error(f"Error in menu whois: {e}")
def on_menu_change_nick(self, event):
try:
dlg = wx.TextEntryDialog(self, "Enter new nickname:", "Change Nick", self.nick)
if dlg.ShowModal() == wx.ID_OK:
new_nick = dlg.GetValue()
if new_nick and self.is_connected():
self.connection.nick(new_nick)
dlg.Destroy()
except Exception as e:
logger.error(f"Error in menu change nick: {e}")
def on_menu_away(self, event):
try:
if self.away and self.is_connected():
self.connection.send_raw("AWAY")
self.away = False
self.SetStatusText(f"Connected to {self.server}")
elif self.is_connected():
msg = wx.GetTextFromUser("Away message:", "Set Away", "Away from keyboard")
if msg is not None:
self.connection.send_raw(f"AWAY :{msg}")
self.away = True
self.SetStatusText(f"Connected to {self.server} (Away)")
except Exception as e:
logger.error(f"Error in menu away: {e}")
def on_menu_timestamps(self, event):
try:
self.timestamps = self.timestamp_item.IsChecked()
except Exception as e:
logger.error(f"Error in menu timestamps: {e}")
def on_menu_highlights(self, event):
try:
current = ", ".join(self.highlights)
dlg = wx.TextEntryDialog(self, "Enter highlight words (comma separated):",
"Set Highlights", current)
if dlg.ShowModal() == wx.ID_OK:
text = dlg.GetValue()
self.highlights = [h.strip() for h in text.split(',') if h.strip()]
dlg.Destroy()
except Exception as e:
logger.error(f"Error in menu highlights: {e}")
def on_menu_autojoin(self, event):
try:
current = ", ".join(self.auto_join_channels)
dlg = wx.TextEntryDialog(self, "Enter channels to auto-join (comma separated):",
"Auto-join Channels", current)
if dlg.ShowModal() == wx.ID_OK:
text = dlg.GetValue()
self.auto_join_channels = [c.strip() for c in text.split(',') if c.strip()]
dlg.Destroy()
except Exception as e:
logger.error(f"Error in menu autojoin: {e}")
def on_menu_help(self, event):
"""Show command help"""
help_text = """
IRC Client Help:
BASIC USAGE:
- Configure server/nick in left panel and click Connect
- Join channels using the join box or /join command
- Type messages in the input box at bottom
- Use Up/Down arrows for message history
- Tab for nickname completion in channels
- Ctrl+F to search in chat history
- Ctrl+Esc to quickly exit the application
TEXT FORMATTING:
- Usernames are colored for easy identification
- URLs are automatically clickable
- Each user has a consistent color
COMMANDS (type /help in chat for full list):
/join #channel - Join a channel
/part - Leave current channel
/msg nick message - Private message
/me action - Action message
/nick newname - Change nickname
/away [message] - Set away status
"""
self.log_server(help_text, wx.Colour(200, 255, 200), bold=True)
# IRC Event Handlers - All use thread-safe UI updates
def on_welcome(self, connection, event):
try:
self.log_server("Connected to server!", wx.Colour(100, 255, 100), bold=True)
self.log_server(f"Welcome message: {' '.join(event.arguments)}", wx.Colour(150, 255, 150))
# Auto-join channels
for channel in self.auto_join_channels:
if not channel.startswith('#'):
channel = '#' + channel
time.sleep(0.5)
if self.is_connected():
connection.join(channel)
except Exception as e:
logger.error(f"Error in welcome handler: {e}")
def on_join(self, connection, event):
try:
nick = event.source.nick
channel = event.target
if nick == self.nick:
self.safe_ui_update(self.add_channel, channel)
self.log_server(f"Joined channel {channel}", wx.Colour(100, 255, 100))
self.log_channel_message(channel, nick, f"{nick} joined", is_system=True)
if nick not in self.channel_users[channel]:
self.channel_users[channel].append(nick)
if channel == self.current_channel:
self.update_user_list(channel)
except Exception as e:
logger.error(f"Error in join handler: {e}")
def on_part(self, connection, event):
try:
nick = event.source.nick
channel = event.target
reason = event.arguments[0] if event.arguments else ""
msg = f"{nick} left"
if reason:
msg += f" ({reason})"
self.log_channel_message(channel, nick, msg, is_system=True)
if nick in self.channel_users[channel]:
self.channel_users[channel].remove(nick)
if channel == self.current_channel:
self.update_user_list(channel)
except Exception as e:
logger.error(f"Error in part handler: {e}")
def on_quit(self, connection, event):
try:
nick = event.source.nick
reason = event.arguments[0] if event.arguments else "Quit"
for channel in list(self.channel_users.keys()):
if nick in self.channel_users[channel]:
self.channel_users[channel].remove(nick)
self.log_channel_message(channel, nick, f"{nick} quit ({reason})", is_system=True)
if self.current_channel:
self.update_user_list(self.current_channel)
except Exception as e:
logger.error(f"Error in quit handler: {e}")
def on_pubmsg(self, connection, event):
try:
nick = event.source.nick
channel = event.target
message = event.arguments[0]
# Check for action messages (/me)
if message.startswith('\x01ACTION') and message.endswith('\x01'):
message = message[8:-1]
self.log_channel_message(channel, nick, message, is_action=True)
else:
self.log_channel_message(channel, nick, message)
# Highlight own nick in messages
if self.nick.lower() in message.lower():
self.safe_ui_update(wx.Bell)
except Exception as e:
logger.error(f"Error in pubmsg handler: {e}")
def on_privmsg(self, connection, event):
try:
nick = event.source.nick
message = event.arguments[0]
# Check for action messages in private queries too
if message.startswith('\x01ACTION') and message.endswith('\x01'):
message = message[8:-1]
self.log_channel_message(nick, nick, message, is_action=True)
else:
self.log_channel_message(nick, nick, message)
except Exception as e:
logger.error(f"Error in privmsg handler: {e}")
def on_namreply(self, connection, event):
try:
channel = event.arguments[1]
users = event.arguments[2].split()
clean_users = []
for user in users:
user = user.lstrip("@+%&~")
clean_users.append(user)
self.channel_users[channel].extend(clean_users)
if channel == self.current_channel:
self.update_user_list(channel)
except Exception as e:
logger.error(f"Error in namreply handler: {e}")
def on_nick(self, connection, event):
try:
old_nick = event.source.nick
new_nick = event.target
# Update color mapping for the new nick
if old_nick in self.user_colors:
self.user_colors[new_nick] = self.user_colors[old_nick]
del self.user_colors[old_nick]
if old_nick == self.nick:
self.nick = new_nick
self.log_server(f"You are now known as {new_nick}", wx.Colour(150, 200, 255), bold=True)
for channel in self.channel_users:
if old_nick in self.channel_users[channel]:
self.channel_users[channel].remove(old_nick)
self.channel_users[channel].append(new_nick)
self.log_channel_message(channel, new_nick, f"{old_nick} is now known as {new_nick}", is_system=True)
if self.current_channel:
self.update_user_list(self.current_channel)
except Exception as e:
logger.error(f"Error in nick handler: {e}")
def on_mode(self, connection, event):
try:
channel = event.target
mode = " ".join(event.arguments)
nick = event.source.nick
self.log_channel_message(channel, nick, f"Mode {mode} by {nick}", is_system=True)
except Exception as e:
logger.error(f"Error in mode handler: {e}")
def on_notice(self, connection, event):
try:
nick = event.source.nick if hasattr(event.source, 'nick') else str(event.source)
message = event.arguments[0]
self.log_server(f"-{nick}- {message}", wx.Colour(255, 150, 255), italic=True)
except Exception as e:
logger.error(f"Error in notice handler: {e}")
def on_disconnect(self, connection, event):
try:
self.log_server("Disconnected from server", wx.Colour(255, 100, 100), bold=True)
self.safe_ui_update(self.on_disconnect_cleanup)
except Exception as e:
logger.error(f"Error in disconnect handler: {e}")
def on_topic(self, connection, event):
try:
channel = event.arguments[0] if event.arguments else event.target
topic = event.arguments[1] if len(event.arguments) > 1 else event.arguments[0]
nick = event.source.nick if hasattr(event.source, 'nick') else "Server"
self.log_channel_message(channel, nick, f"Topic: {topic}", is_system=True)
except Exception as e:
logger.error(f"Error in topic handler: {e}")
def on_kick(self, connection, event):
try:
channel = event.target
kicked = event.arguments[0]
reason = event.arguments[1] if len(event.arguments) > 1 else "No reason"
kicker = event.source.nick
self.log_channel_message(channel, kicker, f"{kicked} was kicked by {kicker} ({reason})", is_system=True)
if kicked in self.channel_users[channel]:
self.channel_users[channel].remove(kicked)
if channel == self.current_channel:
self.update_user_list(channel)
except Exception as e:
logger.error(f"Error in kick handler: {e}")
def on_whoisuser(self, connection, event):
try:
nick = event.arguments[0]
user = event.arguments[1]
host = event.arguments[2]
realname = event.arguments[4]
user_color = self.get_user_color(nick)
self.log_server(f"WHOIS {nick}: {user}@{host} ({realname})", user_color)
except Exception as e:
logger.error(f"Error in whoisuser handler: {e}")
def on_whoischannels(self, connection, event):
try:
nick = event.arguments[0]
channels = event.arguments[1]
user_color = self.get_user_color(nick)
self.log_server(f"WHOIS {nick} channels: {channels}", user_color)
except Exception as e:
logger.error(f"Error in whoischannels handler: {e}")
def on_whoisserver(self, connection, event):
try:
nick = event.arguments[0]
server = event.arguments[1]
user_color = self.get_user_color(nick)
self.log_server(f"WHOIS {nick} server: {server}", user_color)
except Exception as e:
logger.error(f"Error in whoisserver handler: {e}")
def on_close(self, event):
try:
# Stop UI timer first
if self.ui_timer and self.ui_timer.IsRunning():
self.ui_timer.Stop()
if self.is_connected():
self.disconnect()
# Give it a moment to disconnect gracefully
wx.CallLater(1000, self.Destroy)
else:
self.Destroy()
except Exception as e:
logger.error(f"Error during close: {e}")
self.Destroy()
if __name__ == "__main__":
try:
app = wx.App()
frame = IRCFrame()
app.MainLoop()
except Exception as e:
logger.critical(f"Fatal error: {e}")
print(f"Fatal error: {e}")
traceback.print_exc()