Files
baythn_bot/main.py
2025-12-19 20:40:02 +01:00

850 lines
28 KiB
Python

import discord
from discord.ext import commands, tasks
from discord.ui import Button, View, Select, Modal, TextInput
import aiohttp
import os
from datetime import datetime
from typing import List, Dict, Optional
from dotenv import load_dotenv
from CommitsActionView import CommitActionsView
import SearchModal
import FileBrowserSelect
from ComitsPaginatedView import CommitsPaginatedView
# Load environment variables from .env file
load_dotenv()
# Configuration
DISCORD_TOKEN = os.getenv('DISCORD_BOT_TOKEN', '').strip()
CHANNEL_ID = int(os.getenv('DISCORD_CHANNEL_ID', '0'))
GITEA_BASE_URL = 'http://rattatwinko.servecounterstrike.com/gitea'
GITEA_API_BASE = f'{GITEA_BASE_URL}/api/v1'
REPO_OWNER = 'rattatwinko'
REPO_NAME = 'INF6B'
BRANCH = 'main'
CHECK_INTERVAL = 60 # seconds
# Bot setup
intents = discord.Intents.default()
intents.message_content = True
intents.guilds = True
intents.members = True # For better interactions
bot = commands.Bot(
command_prefix='!',
intents=intents,
help_command=None # We'll create custom help
)
# Store last checked commit
last_commit_sha = None
# ---------- API FUNCTIONS ----------
async def fetch_commits(limit=10, sha=None):
"""Fetch recent commits from Gitea API"""
try:
url = f'{GITEA_API_BASE}/repos/{REPO_OWNER}/{REPO_NAME}/commits'
params = {'sha': sha or BRANCH, 'limit': limit}
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as resp:
if resp.status == 200:
return await resp.json()
else:
print(f"Error fetching commits: {resp.status}")
return None
except Exception as e:
print(f"Exception fetching commits: {e}")
return None
async def fetch_commit_files(sha):
"""Fetch file changes for a specific commit"""
try:
url = f'{GITEA_API_BASE}/repos/{REPO_OWNER}/{REPO_NAME}/git/commits/{sha}'
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
if resp.status == 200:
data = await resp.json()
return data.get('files', [])
else:
return None
except Exception as e:
print(f"Exception fetching commit files: {e}")
return None
async def fetch_commit_diff(sha):
"""Fetch the actual diff text for a commit"""
try:
url = f'{GITEA_API_BASE}/repos/{REPO_OWNER}/{REPO_NAME}/git/commits/{sha}.diff'
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
if resp.status == 200:
try:
return await resp.text(encoding='utf-8')
except UnicodeDecodeError:
return await resp.text(encoding='latin-1')
else:
return None
except Exception as e:
print(f"Exception fetching diff: {e}")
return None
def format_diff_stats(files):
"""Format file changes into a readable string"""
if not files:
return "No file changes available", 0, 0
stats = []
total_additions = 0
total_deletions = 0
for file in files[:10]:
filename = file.get('filename', 'Unknown')
additions = file.get('additions', 0)
deletions = file.get('deletions', 0)
status = file.get('status', 'modified')
total_additions += additions
total_deletions += deletions
# Create a visual indicator
if status == 'added':
icon = '🆕'
elif status == 'removed':
icon = '🗑️'
elif status == 'renamed':
icon = '📝'
else:
icon = '📄'
stats.append(f"{icon} `{filename}` (+{additions}/-{deletions})")
if len(files) > 10:
stats.append(f"... and {len(files) - 10} more files")
return "\n".join(stats), total_additions, total_deletions
def create_commit_embed(commit, files=None):
"""Create a Discord embed for a commit"""
commit_msg = commit['commit']['message']
msg_lines = commit_msg.split('\n', 1)
title = msg_lines[0][:256]
description = msg_lines[1][:500] if len(msg_lines) > 1 else ""
embed = discord.Embed(
title=f"📝 Commit: {title}",
description=description if description else None,
color=discord.Color.green(),
timestamp=datetime.fromisoformat(commit['commit']['committer']['date'].replace('Z', '+00:00')),
url=commit['html_url']
)
embed.set_author(
name=commit['commit']['author']['name'],
icon_url=commit['author']['avatar_url'] if commit.get('author') else None
)
embed.add_field(name="👤 Author", value=commit['commit']['author']['name'], inline=True)
embed.add_field(name="🔑 Commit Hash", value=f"`{commit['sha'][:7]}`", inline=True)
embed.add_field(name="🌿 Branch", value=BRANCH, inline=True)
if files:
diff_text, additions, deletions = format_diff_stats(files)
embed.add_field(
name=f"📊 File Changes (+{additions}/-{deletions})",
value=diff_text,
inline=False
)
embed.set_footer(text=f"{REPO_OWNER}/{REPO_NAME} • Use buttons below to explore")
return embed
# ---------- INTERACTIVE FUNCTIONS ----------
async def show_commit_diff_interactive(interaction: discord.Interaction, commit_hash: str):
"""Show diff with interactive controls"""
try:
# Find the commit
commits = await fetch_commits(50)
matching_commit = None
for commit in commits:
if commit['sha'].startswith(commit_hash):
matching_commit = commit
break
if not matching_commit:
await interaction.followup.send("Commit not found.", ephemeral=True)
return
diff_text = await fetch_commit_diff(matching_commit['sha'])
if not diff_text:
await interaction.followup.send("Could not fetch diff.", ephemeral=True)
return
# Split diff into manageable chunks
lines = diff_text.split('\n')
chunks = []
current_chunk = []
current_length = 0
for line in lines:
line_length = len(line) + 1
if current_length + line_length > 1800 or len(current_chunk) >= 40:
if current_chunk:
chunks.append('\n'.join(current_chunk))
current_chunk = [line]
current_length = line_length
else:
current_chunk.append(line)
current_length += line_length
if current_chunk:
chunks.append('\n'.join(current_chunk))
# Create paginated view for diff
if len(chunks) > 1:
embed = discord.Embed(
title=f"📊 Diff for commit `{matching_commit['sha'][:7]}`",
description=f"**Part 1/{len(chunks)}**",
color=discord.Color.blue()
)
await interaction.followup.send(embed=embed, ephemeral=True)
for i, chunk in enumerate(chunks, 1):
await interaction.followup.send(f"```diff\n{chunk}\n```", ephemeral=True)
else:
embed = discord.Embed(
title=f"📊 Diff for commit `{matching_commit['sha'][:7]}`",
color=discord.Color.blue()
)
await interaction.followup.send(embed=embed, ephemeral=True)
await interaction.followup.send(f"```diff\n{diff_text[:1800]}\n```", ephemeral=True)
except Exception as e:
await interaction.followup.send(f"Error: {str(e)}", ephemeral=True)
async def show_file_diff(interaction: discord.Interaction, commit_sha: str, filename: str):
"""Show diff for a specific file"""
try:
# Fetch full diff
full_diff = await fetch_commit_diff(commit_sha)
if not full_diff:
await interaction.response.send_message("Could not fetch diff.", ephemeral=True)
return
# Extract file diff
lines = full_diff.split('\n')
file_diff = []
in_target_file = False
for line in lines:
if line.startswith('diff --git'):
if f'b/{filename}' in line or f'a/{filename}' in line:
in_target_file = True
file_diff.append(line)
elif in_target_file:
break
elif in_target_file:
file_diff.append(line)
if not file_diff:
await interaction.response.send_message(f"No diff found for `{filename}`", ephemeral=True)
return
diff_text = '\n'.join(file_diff)
# Create embed with file info
embed = discord.Embed(
title=f"📄 {filename}",
description=f"**Commit:** `{commit_sha[:7]}`",
color=discord.Color.blue()
)
if len(diff_text) > 1800:
embed.add_field(name="Diff Preview", value=f"```diff\n{diff_text[:500]}...\n```", inline=False)
embed.set_footer(text="Diff too large to display fully")
await interaction.response.send_message(embed=embed, ephemeral=True)
# Send the rest in follow-up
chunks = [diff_text[i:i+1800] for i in range(0, len(diff_text), 1800)]
for i, chunk in enumerate(chunks[1:], 2):
await interaction.followup.send(f"```diff\n{chunk}\n```", ephemeral=True)
else:
embed.add_field(name="Changes", value=f"```diff\n{diff_text}\n```", inline=False)
await interaction.response.send_message(embed=embed, ephemeral=True)
except Exception as e:
await interaction.response.send_message(f"Error: {str(e)}", ephemeral=True)
async def search_commits(interaction: discord.Interaction, search_type: str, search_term: str):
"""Search commits by various criteria"""
try:
commits = await fetch_commits(100) # Get more commits for searching
if not commits:
await interaction.followup.send("Could not fetch commits.", ephemeral=True)
return
matching_commits = []
search_term_lower = search_term.lower()
for commit in commits:
if search_type == "message":
if search_term_lower in commit['commit']['message'].lower():
matching_commits.append(commit)
elif search_type == "author":
if search_term_lower in commit['commit']['author']['name'].lower():
matching_commits.append(commit)
elif search_type == "hash":
if commit['sha'].startswith(search_term_lower):
matching_commits.append(commit)
if not matching_commits:
await interaction.followup.send(f"No commits found matching '{search_term}'", ephemeral=True)
return
embed = discord.Embed(
title=f"🔍 Search Results: {len(matching_commits)} commits found",
description=f"Searching by {search_type} for '{search_term}'",
color=discord.Color.purple()
)
for i, commit in enumerate(matching_commits[:5]):
msg = commit['commit']['message'].split('\n')[0][:80]
author = commit['commit']['author']['name']
sha = commit['sha'][:7]
embed.add_field(
name=f"{i+1}. `{sha}` by {author}",
value=f"```{msg}```\n`!commit view {sha}`",
inline=False
)
if len(matching_commits) > 5:
embed.add_field(
name="More Results",
value=f"... and {len(matching_commits) - 5} more commits\nUse `!commit search` for more specific searches",
inline=False
)
await interaction.followup.send(embed=embed, ephemeral=True)
except Exception as e:
await interaction.followup.send(f"Error: {str(e)}", ephemeral=True)
# ---------- TASK ----------
@tasks.loop(seconds=CHECK_INTERVAL)
async def check_commits():
"""Periodically check for new commits"""
global last_commit_sha
commits = await fetch_commits()
if not commits or len(commits) == 0:
return
latest_commit = commits[0]
latest_sha = latest_commit['sha']
if last_commit_sha is None:
last_commit_sha = latest_sha
return
if latest_sha != last_commit_sha:
print(f"New commit detected: {latest_sha[:7]}")
new_commits = []
for commit in commits:
if commit['sha'] == last_commit_sha:
break
new_commits.append(commit)
channel = bot.get_channel(CHANNEL_ID)
if channel:
for commit in reversed(new_commits):
files = await fetch_commit_files(commit['sha'])
embed = create_commit_embed(commit, files)
# Add action buttons for new commits
view = CommitActionsView(commit, files)
try:
await channel.send(embed=embed, view=view)
except Exception as e:
print(f"Error sending notification: {e}")
last_commit_sha = latest_sha
# ---------- INTUITIVE COMMANDS ----------
@bot.command(name="help")
async def help_command(ctx):
"""Show interactive help menu"""
embed = discord.Embed(
title="🤖 Gitea Monitor Bot Help",
description=f"Monitoring **{REPO_OWNER}/{REPO_NAME}** on branch **{BRANCH}**\n\n"
"**📋 Quick Commands:**",
color=discord.Color.blue()
)
# Main commands
embed.add_field(
name="🔍 `!commits`",
value="Browse recent commits with pagination",
inline=False
)
embed.add_field(
name="📝 `!commit view <hash>`",
value="View details of a specific commit\nExample: `!commit view abc123`",
inline=False
)
embed.add_field(
name="🔎 `!commit search`",
value="Search commits by message, author, or hash",
inline=False
)
embed.add_field(
name="📊 `!commit diff <hash>`",
value="Show diff for a commit\nExample: `!commit diff abc123`",
inline=False
)
embed.add_field(
name="📁 `!files <hash>`",
value="Browse files changed in a commit\nExample: `!files abc123`",
inline=False
)
embed.add_field(
name="📈 `!status`",
value="Show bot status and latest commit",
inline=False
)
embed.add_field(
name="⚙️ `!monitor`",
value="Monitor a different branch or repository\n*(Admin only)*",
inline=False
)
# Quick actions view
view = View(timeout=60)
view.add_item(Button(
label="📚 Browse Commits",
style=discord.ButtonStyle.primary,
custom_id="quick_browse"
))
view.add_item(Button(
label="🔍 Search Commits",
style=discord.ButtonStyle.secondary,
custom_id="quick_search"
))
view.add_item(Button(
label="📊 View Status",
style=discord.ButtonStyle.success,
custom_id="quick_status"
))
# Button callbacks
async def browse_callback(interaction: discord.Interaction):
await ctx.invoke(bot.get_command('commits'))
async def search_callback(interaction: discord.Interaction):
modal = SearchModal("message")
await interaction.response.send_modal(modal)
async def status_callback(interaction: discord.Interaction):
await ctx.invoke(bot.get_command('status'))
# Assign callbacks
for child in view.children:
if child.custom_id == "quick_browse":
child.callback = browse_callback
elif child.custom_id == "quick_search":
child.callback = search_callback
elif child.custom_id == "quick_status":
child.callback = status_callback
await ctx.send(embed=embed, view=view)
@bot.group(name="commit", invoke_without_command=True)
async def commit_group(ctx):
"""Main commit command group"""
if ctx.invoked_subcommand is None:
embed = discord.Embed(
title="📝 Commit Commands",
description="Available subcommands:",
color=discord.Color.blue()
)
embed.add_field(
name="`!commit view <hash>`",
value="View a specific commit\nExample: `!commit view abc123`",
inline=False
)
embed.add_field(
name="`!commit browse`",
value="Browse recent commits interactively",
inline=False
)
embed.add_field(
name="`!commit search`",
value="Search for commits",
inline=False
)
embed.add_field(
name="`!commit diff <hash>`",
value="Show diff for a commit",
inline=False
)
embed.add_field(
name="`!commit latest [count]`",
value="Show latest commits (default: 10)",
inline=False
)
await ctx.send(embed=embed)
@commit_group.command(name="view")
async def commit_view(ctx, commit_hash: str):
"""View details of a specific commit"""
try:
commits = await fetch_commits(50)
matching_commit = None
for commit in commits:
if commit['sha'].startswith(commit_hash):
matching_commit = commit
break
if not matching_commit:
await ctx.send(f"❌ Commit `{commit_hash}` not found.")
return
files = await fetch_commit_files(matching_commit['sha'])
embed = create_commit_embed(matching_commit, files)
view = CommitActionsView(matching_commit, files)
await ctx.send(embed=embed, view=view)
except Exception as e:
await ctx.send(f"❌ Error: {str(e)}")
@commit_group.command(name="browse")
async def commit_browse(ctx, count: int = 20):
"""Browse recent commits interactively"""
try:
if count < 1 or count > 50:
await ctx.send("❌ Please choose a count between 1 and 50.")
return
await ctx.send(f"🔍 Fetching {count} recent commits...")
commits = await fetch_commits(count)
if not commits:
await ctx.send("❌ Could not fetch commits.")
return
view = CommitsPaginatedView(commits, f"{REPO_OWNER}/{REPO_NAME}")
embed = view.create_embed()
await ctx.send(embed=embed, view=view)
except Exception as e:
await ctx.send(f"❌ Error: {str(e)}")
@commit_group.command(name="search")
async def commit_search(ctx):
"""Search commits interactively"""
view = View(timeout=60)
async def search_by_message(interaction: discord.Interaction):
modal = SearchModal("message")
await interaction.response.send_modal(modal)
async def search_by_author(interaction: discord.Interaction):
modal = SearchModal("author")
await interaction.response.send_modal(modal)
async def search_by_hash(interaction: discord.Interaction):
modal = SearchModal("hash")
await interaction.response.send_modal(modal)
# Create buttons
message_btn = Button(label="📝 Search by Message", style=discord.ButtonStyle.primary, emoji="📝")
author_btn = Button(label="👤 Search by Author", style=discord.ButtonStyle.primary, emoji="👤")
hash_btn = Button(label="🔑 Search by Hash", style=discord.ButtonStyle.primary, emoji="🔑")
message_btn.callback = search_by_message
author_btn.callback = search_by_author
hash_btn.callback = search_by_hash
view.add_item(message_btn)
view.add_item(author_btn)
view.add_item(hash_btn)
embed = discord.Embed(
title="🔍 Search Commits",
description="Choose how you want to search:",
color=discord.Color.purple()
)
embed.add_field(name="📝 By Message", value="Search in commit messages", inline=True)
embed.add_field(name="👤 By Author", value="Search by author name", inline=True)
embed.add_field(name="🔑 By Hash", value="Search by commit hash (partial)", inline=True)
await ctx.send(embed=embed, view=view)
@commit_group.command(name="diff")
async def commit_diff(ctx, commit_hash: str):
"""Show diff for a commit"""
await show_commit_diff_interactive(ctx.interaction if hasattr(ctx, 'interaction') else None, commit_hash)
@commit_group.command(name="latest")
async def commit_latest(ctx, count: int = 10):
"""Show latest commits"""
await commit_browse(ctx, count)
@bot.command(name="commits")
async def commits_browse(ctx, count: int = 20):
"""Browse commits (alias for !commit browse)"""
await commit_browse(ctx, count)
@bot.command(name="files")
async def files_browse(ctx, commit_hash: str):
"""Browse files changed in a commit"""
try:
commits = await fetch_commits(50)
matching_commit = None
for commit in commits:
if commit['sha'].startswith(commit_hash):
matching_commit = commit
break
if not matching_commit:
await ctx.send(f"❌ Commit `{commit_hash}` not found.")
return
files = await fetch_commit_files(matching_commit['sha'])
if not files:
await ctx.send("❌ No files changed in this commit.")
return
view = FileBrowserSelect(files, matching_commit['sha'])
embed = discord.Embed(
title=f"📁 Files in commit `{matching_commit['sha'][:7]}`",
description=f"**{len(files)}** files changed\n"
f"Select a file from the dropdown below:",
color=discord.Color.blue()
)
file_preview = "\n".join([f"• `{f['filename']}`" for f in files[:5]])
if len(files) > 5:
file_preview += f"\n... and {len(files) - 5} more"
embed.add_field(name="Files Changed", value=file_preview, inline=False)
await ctx.send(embed=embed, view=view)
except Exception as e:
await ctx.send(f"❌ Error: {str(e)}")
@bot.command(name="status")
async def bot_status(ctx):
"""Show bot status and repository info"""
global last_commit_sha
commits = await fetch_commits(1)
latest_commit = commits[0] if commits else None
embed = discord.Embed(
title="🤖 Bot Status Dashboard",
color=discord.Color.green()
)
# Repository info
embed.add_field(
name="📦 Repository",
value=f"[{REPO_OWNER}/{REPO_NAME}]({GITEA_BASE_URL}/{REPO_OWNER}/{REPO_NAME})",
inline=False
)
# Branch and monitoring
embed.add_field(name="🌿 Branch", value=BRANCH, inline=True)
embed.add_field(name="⏰ Check Interval", value=f"{CHECK_INTERVAL}s", inline=True)
embed.add_field(
name="✅ Monitoring",
value="🟢 Active" if check_commits.is_running() else "🔴 Inactive",
inline=True
)
# Last commit info
if last_commit_sha:
embed.add_field(name="📝 Last Known Commit", value=f"`{last_commit_sha[:7]}`", inline=True)
if latest_commit:
msg = latest_commit['commit']['message'].split('\n')[0][:100]
embed.add_field(
name="🆕 Latest Commit",
value=f"[`{latest_commit['sha'][:7]}`]({latest_commit['html_url']})\n```{msg}```",
inline=False
)
# Quick actions
embed.add_field(
name="🚀 Quick Actions",
value="• `!commits` - Browse recent commits\n"
"• `!commit view <hash>` - View a specific commit\n"
"• `!files <hash>` - Browse files in a commit",
inline=False
)
# Create status view with buttons
view = View(timeout=60)
refresh_btn = Button(label="🔄 Refresh", style=discord.ButtonStyle.secondary, emoji="🔄")
browse_btn = Button(label="📚 Browse Commits", style=discord.ButtonStyle.primary, emoji="📚")
async def refresh_callback(interaction: discord.Interaction):
await ctx.invoke(bot.get_command('status'))
await interaction.response.defer()
async def browse_callback(interaction: discord.Interaction):
await ctx.invoke(bot.get_command('commits'))
refresh_btn.callback = refresh_callback
browse_btn.callback = browse_callback
view.add_item(refresh_btn)
view.add_item(browse_btn)
await ctx.send(embed=embed, view=view)
@bot.command(name="monitor")
@commands.has_permissions(administrator=True)
async def monitor_config(ctx, branch: str = None):
"""Configure monitoring settings (Admin only)"""
global BRANCH
if branch:
BRANCH = branch
await ctx.send(f"✅ Now monitoring branch: **{BRANCH}**")
else:
embed = discord.Embed(
title="⚙️ Monitor Configuration",
description=f"Currently monitoring: **{BRANCH}**\n\n"
f"To change branch:\n`!monitor <branch_name>`\n\n"
f"Example: `!monitor develop`",
color=discord.Color.orange()
)
await ctx.send(embed=embed)
# ---------- BOT EVENTS ----------
@bot.event
async def on_ready():
"""Called when the bot is ready"""
print(f'🤖 Logged in as {bot.user.name} ({bot.user.id})')
print(f'📦 Monitoring: {REPO_OWNER}/{REPO_NAME}@{BRANCH}')
print(f'📺 Channel ID: {CHANNEL_ID}')
print(f'🔗 Gitea URL: {GITEA_BASE_URL}')
# Check channel access
channel = bot.get_channel(CHANNEL_ID)
if channel:
print(f'✅ Found channel: #{channel.name} in {channel.guild.name}')
else:
print(f'❌ Cannot find channel with ID {CHANNEL_ID}')
# Start monitoring task
if not check_commits.is_running():
check_commits.start()
print("✅ Started commit monitoring")
# Set bot presence
await bot.change_presence(
activity=discord.Activity(
type=discord.ActivityType.watching,
name=f"{REPO_NAME} commits"
)
)
# ---------- ERROR HANDLING ----------
@bot.event
async def on_command_error(ctx, error):
"""Handle command errors"""
if isinstance(error, commands.CommandNotFound):
embed = discord.Embed(
title="❌ Command Not Found",
description=f"Use `!help` to see available commands.",
color=discord.Color.red()
)
await ctx.send(embed=embed)
elif isinstance(error, commands.MissingPermissions):
embed = discord.Embed(
title="❌ Permission Denied",
description="You don't have permission to use this command.",
color=discord.Color.red()
)
await ctx.send(embed=embed)
elif isinstance(error, commands.MissingRequiredArgument):
embed = discord.Embed(
title="❌ Missing Argument",
description=f"Missing required argument: `{error.param.name}`\n"
f"Use `!help {ctx.command.name}` for usage.",
color=discord.Color.red()
)
await ctx.send(embed=embed)
else:
embed = discord.Embed(
title="❌ Unexpected Error",
description=f"```{str(error)[:500]}```",
color=discord.Color.red()
)
await ctx.send(embed=embed)
print(f"Command error: {error}")
# ---------- RUN BOT ----------
if __name__ == '__main__':
if not DISCORD_TOKEN:
print("❌ Error: DISCORD_BOT_TOKEN not set in .env file")
exit(1)
if CHANNEL_ID == 0:
print("❌ Error: DISCORD_CHANNEL_ID not set in .env file")
exit(1)
try:
bot.run(DISCORD_TOKEN)
except Exception as e:
print(f"❌ Fatal error: {e}")
exit(1)