Files
bytechat/src/app.py

548 lines
19 KiB
Python

import eventlet
eventlet.monkey_patch()
from flask import Flask, make_response, render_template, request, jsonify
from flask_socketio import SocketIO, emit, join_room, leave_room, disconnect
import collections
import threading
import time
import uuid
import json
import secrets
import hashlib
import hmac
import re
import xml.etree.ElementTree as ET
import requests
from datetime import datetime, timedelta
import logging
from functools import wraps
import os
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
app = Flask(__name__)
app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', 'dev-secret-key-change-in-production')
socketio = SocketIO(app, cors_allowed_origins="*", async_mode='eventlet')
# Security constants
MAX_MESSAGES = 512
MAX_ROOM_ID_LENGTH = 32
MAX_MESSAGE_SIZE = 8192 # 8KB max encrypted message
MAX_ROOMS_PER_IP = 100
MAX_USERS_PER_ROOM = 1000
ROOM_CLEANUP_INTERVAL = 3600 # 1 hour
USER_SESSION_TIMEOUT = 3600 # 1 hour
# In-memory storage with enhanced security
chat_rooms = {}
room_keys = {}
user_sessions = {}
room_passwords = {} # Separate password storage with hashing
room_creation_times = {}
ip_room_count = {} # Track rooms created per IP
failed_password_attempts = {} # Track failed password attempts
message_hashes = {} # Store message hashes for duplicate detection
class CircularMessageBuffer:
def __init__(self, max_size=MAX_MESSAGES):
self.buffer = collections.deque(maxlen=max_size)
self.lock = threading.Lock()
self.creation_time = time.time()
self.last_activity = time.time()
def add_message(self, message_data):
with self.lock:
# Generate unique message ID and validate
message_id = str(uuid.uuid4())
# Create message hash for duplicate detection
message_hash = hashlib.sha256(
f"{message_data['encrypted_content']}{message_data['sender_id']}{message_data.get('iv', '')}".encode()
).hexdigest()
self.buffer.append({
'id': message_id,
'encrypted_content': message_data['encrypted_content'],
'sender_id': message_data['sender_id'],
'timestamp': time.time(),
'iv': message_data.get('iv'),
'hash': message_hash
})
self.last_activity = time.time()
def get_messages(self):
with self.lock:
return list(self.buffer)
def get_message_count(self):
with self.lock:
return len(self.buffer)
def is_expired(self, timeout_seconds=USER_SESSION_TIMEOUT):
return time.time() - self.last_activity > timeout_seconds
# Security validation functions
def is_valid_room_id(room_id):
"""Validate room ID format and length"""
if not room_id or len(room_id) > MAX_ROOM_ID_LENGTH:
return False
# Allow only alphanumeric characters and hyphens
return re.match(r'^[a-zA-Z0-9\-_]+$', room_id) is not None
def is_valid_message(encrypted_content, iv=None):
"""Validate message format and size"""
if not encrypted_content or len(encrypted_content) > MAX_MESSAGE_SIZE:
return False
if iv and len(iv) > 256: # IV should be reasonable size
return False
return True
def is_valid_public_key(public_key):
"""Validate public key format"""
if not public_key or len(public_key) > 2048: # RSA-2048 base64 encoded
return False
try:
# Basic base64 validation
import base64
base64.b64decode(public_key)
return True
except Exception:
return False
def hash_password(password, salt=None):
"""Hash password with salt using PBKDF2"""
if salt is None:
salt = secrets.token_bytes(32)
else:
salt = bytes.fromhex(salt)
hashed = hashlib.pbkdf2_hmac('sha256', password.encode(), salt, 100000)
return salt.hex() + hashed.hex()
def verify_password(password, hashed):
"""Verify password against hash"""
try:
salt = bytes.fromhex(hashed[:64])
stored_hash = hashed[64:]
new_hash = hashlib.pbkdf2_hmac('sha256', password.encode(), salt, 100000).hex()
return hmac.compare_digest(stored_hash, new_hash)
except Exception:
return False
def cleanup_expired_rooms():
"""Clean up expired rooms and sessions"""
current_time = time.time()
expired_rooms = []
for room_id, room_buffer in chat_rooms.items():
if room_buffer.is_expired():
expired_rooms.append(room_id)
for room_id in expired_rooms:
logger.info(f"Cleaning up expired room: {room_id}")
cleanup_room(room_id)
def cleanup_room(room_id):
"""Completely clean up a room"""
chat_rooms.pop(room_id, None)
room_keys.pop(room_id, None)
room_passwords.pop(room_id, None)
room_creation_times.pop(room_id, None)
message_hashes.pop(room_id, None)
def get_client_ip():
"""Get real client IP address"""
if request.headers.get('X-Forwarded-For'):
return request.headers.get('X-Forwarded-For').split(',')[0].strip()
elif request.headers.get('X-Real-IP'):
return request.headers.get('X-Real-IP')
return request.remote_addr
# Authentication decorator
def require_valid_session(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if request.sid not in user_sessions:
logger.warning(f"Invalid session attempt from {get_client_ip()}")
disconnect()
return
return f(*args, **kwargs)
return decorated_function
# CSP header as requested
@app.after_request
def apply_csp(response):
csp = (
"default-src 'self'; "
"script-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net; "
"style-src 'self' 'unsafe-inline'; "
"img-src 'self' data:; "
"connect-src 'self' ws://localhost:5000 wss://localhost:5000;"
)
response.headers["Content-Security-Policy"] = csp
return response
@app.route('/')
def index():
return render_template('chat.html')
@app.route('/room/<room_id>')
def room(room_id):
if not is_valid_room_id(room_id):
return "Invalid room ID", 400
return render_template('chat.html', room_id=room_id)
@app.route('/api/room/<room_id>/info')
def get_room_info(room_id):
if not is_valid_room_id(room_id):
return jsonify({'error': 'Invalid room ID'}), 400
if room_id in chat_rooms:
return jsonify({
'exists': True,
'message_count': chat_rooms[room_id].get_message_count(),
'max_messages': MAX_MESSAGES,
'user_count': len(room_keys.get(room_id, {})),
'max_users': MAX_USERS_PER_ROOM
})
return jsonify({'exists': False})
# Socket event handlers
@socketio.on('connect')
def handle_connect():
client_ip = get_client_ip()
user_id = str(uuid.uuid4())
# Check connection limits
active_connections = sum(1 for session in user_sessions.values()
if session.get('ip') == client_ip)
if active_connections > 10: # Max 10 connections per IP
logger.warning(f"Too many connections from IP: {client_ip}")
disconnect()
return
user_sessions[request.sid] = {
'user_id': user_id,
'room': None,
'display_name': f"User-{user_id[:8]}",
'ip': client_ip,
'connected_at': time.time(),
'last_activity': time.time()
}
emit('user_connected', {'user_id': user_id})
logger.info(f"User {user_id} connected from {client_ip}")
@socketio.on('disconnect')
def handle_disconnect():
if request.sid in user_sessions:
user_data = user_sessions[request.sid]
room_id = user_data.get('room')
user_id = user_data.get('user_id')
if room_id:
leave_room(room_id)
# Remove user's public key
if room_id in room_keys and user_id in room_keys[room_id]:
del room_keys[room_id][user_id]
emit('user_left', {'user_id': user_id}, room=room_id)
# Clean up empty rooms
if room_id in room_keys and len(room_keys[room_id]) == 0:
cleanup_room(room_id)
del user_sessions[request.sid]
logger.info(f"User {user_id} disconnected")
@socketio.on('join_room')
@require_valid_session
def handle_join_room(data):
try:
room_id = data.get('room_id', '').strip()
public_key = data.get('public_key', '')
password = data.get('password', '')
# Validation
if not is_valid_room_id(room_id):
emit('room_joined', {'error': 'Invalid room ID format'})
return
if not is_valid_public_key(public_key):
emit('room_joined', {'error': 'Invalid public key format'})
return
client_ip = get_client_ip()
user_id = user_sessions[request.sid]['user_id']
is_first_user = False
# Check if room exists
if room_id not in chat_rooms:
# Check room creation limits per IP
rooms_by_ip = ip_room_count.get(client_ip, 0)
if rooms_by_ip >= MAX_ROOMS_PER_IP:
emit('room_joined', {'error': 'Too many rooms created from this IP'})
return
# Create new room
chat_rooms[room_id] = CircularMessageBuffer()
room_keys[room_id] = {}
room_creation_times[room_id] = time.time()
message_hashes[room_id] = set()
# Store hashed password if provided
if password:
room_passwords[room_id] = hash_password(password)
# Update IP room count
ip_room_count[client_ip] = rooms_by_ip + 1
is_first_user = True
logger.info(f"Room {room_id} created by {user_id}")
else:
# Check room capacity
if len(room_keys[room_id]) >= MAX_USERS_PER_ROOM:
emit('room_joined', {'error': 'Room is full'})
return
# Verify password if room is password protected
if room_id in room_passwords:
if not password:
emit('room_joined', {'error': 'Password required'})
return
# Check failed attempts
attempt_key = f"{client_ip}_{room_id}"
failed_attempts = failed_password_attempts.get(attempt_key, 0)
if failed_attempts >= 5: # Max 5 failed attempts
emit('room_joined', {'error': 'Too many failed password attempts'})
return
if not verify_password(password, room_passwords[room_id]):
failed_password_attempts[attempt_key] = failed_attempts + 1
emit('room_joined', {'error': 'Incorrect password'})
return
else:
# Clear failed attempts on success
failed_password_attempts.pop(attempt_key, None)
# Generate new UUID for this room session
new_user_id = str(uuid.uuid4())
user_sessions[request.sid]['user_id'] = new_user_id
user_sessions[request.sid]['display_name'] = f"User-{new_user_id[:8]}"
user_sessions[request.sid]['last_activity'] = time.time()
# Store user's public key
room_keys[room_id][new_user_id] = public_key
# Join the room
join_room(room_id)
user_sessions[request.sid]['room'] = room_id
# Get room data
messages = chat_rooms[room_id].get_messages()
user_keys = room_keys[room_id]
# Send room joined confirmation
emit('room_joined', {
'room_id': room_id,
'user_id': new_user_id,
'display_name': user_sessions[request.sid]['display_name'],
'messages': messages,
'users': list(user_keys.keys()),
'user_keys': user_keys,
'message_count': chat_rooms[room_id].get_message_count(),
'is_first_user': is_first_user
})
# Notify others about new user
emit('user_joined', {
'user_id': new_user_id,
'display_name': user_sessions[request.sid]['display_name'],
'public_key': public_key
}, room=room_id, include_self=False)
# Handle session key distribution
if is_first_user:
# First user - they will generate the session key on client side
logger.info(f"First user {new_user_id} joined room {room_id} - will generate session key")
else:
# Not first user - request session key from existing users
logger.info(f"User {new_user_id} joined room {room_id} - requesting session key")
emit('request_session_key', {
'new_user_id': new_user_id,
'public_key': public_key
}, room=room_id, include_self=False)
logger.info(f"User {new_user_id} joined room {room_id}")
except Exception as e:
logger.error(f"Error in join_room: {str(e)}")
emit('room_joined', {'error': 'Internal server error'})
@socketio.on('send_message')
@require_valid_session
def handle_send_message(data):
try:
user_data = user_sessions[request.sid]
room_id = user_data['room']
if not room_id or room_id not in chat_rooms:
return
encrypted_content = data.get('encrypted_content', '')
iv = data.get('iv', '')
# Validate message
if not is_valid_message(encrypted_content, iv):
logger.warning(f"Invalid message from user {user_data['user_id']}")
return
# Check for duplicate messages
message_hash = hashlib.sha256(
f"{encrypted_content}{user_data['user_id']}{iv}".encode()
).hexdigest()
if room_id not in message_hashes:
message_hashes[room_id] = set()
if message_hash in message_hashes[room_id]:
logger.warning(f"Duplicate message detected from user {user_data['user_id']}")
return
message_hashes[room_id].add(message_hash)
# Limit message hash storage
if len(message_hashes[room_id]) > MAX_MESSAGES * 2:
message_hashes[room_id] = set(list(message_hashes[room_id])[-MAX_MESSAGES:])
message_data = {
'encrypted_content': encrypted_content,
'sender_id': user_data['user_id'],
'display_name': user_data['display_name'],
'iv': iv
}
# Add to room buffer
chat_rooms[room_id].add_message(message_data)
# Update user activity
user_sessions[request.sid]['last_activity'] = time.time()
# Broadcast to room
emit('new_message', {
'id': str(uuid.uuid4()),
'encrypted_content': encrypted_content,
'sender_id': user_data['user_id'],
'display_name': user_data['display_name'],
'timestamp': time.time(),
'iv': iv,
'message_count': chat_rooms[room_id].get_message_count()
}, room=room_id)
except Exception as e:
logger.error(f"Error in send_message: {str(e)}")
@socketio.on('share_session_key')
@require_valid_session
def handle_share_session_key(data):
"""Handle sharing of session key with a specific user"""
try:
room_id = data.get('room_id', '')
target_user_id = data.get('target_user_id', '')
encrypted_key = data.get('encrypted_key', '')
# Validation
if not room_id or not target_user_id or not encrypted_key:
logger.warning("Invalid share_session_key request")
return
if len(encrypted_key) > 2048: # Reasonable limit for encrypted session key
logger.warning("Encrypted key too large")
return
sender_id = user_sessions[request.sid]['user_id']
sender_room = user_sessions[request.sid].get('room')
# Verify sender is in the correct room
if room_id != sender_room:
logger.warning(f"User {sender_id} tried to share key for room {room_id} but is in {sender_room}")
return
# Find target user's socket session
target_sid = None
for sid, session in user_sessions.items():
if session.get('user_id') == target_user_id and session.get('room') == room_id:
target_sid = sid
break
if target_sid:
# Send the encrypted session key directly to the target user
emit('session_key_received', {
'from_user_id': sender_id,
'encrypted_key': encrypted_key
}, room=target_sid)
logger.info(f"Session key shared from {sender_id} to {target_user_id} in room {room_id}")
else:
logger.warning(f"Target user {target_user_id} not found in room {room_id}")
except Exception as e:
logger.error(f"Error in share_session_key: {str(e)}")
# Background cleanup task
def start_cleanup_task():
def cleanup_worker():
while True:
try:
cleanup_expired_rooms()
time.sleep(ROOM_CLEANUP_INTERVAL)
except Exception as e:
logger.error(f"Error in cleanup task: {str(e)}")
cleanup_thread = threading.Thread(target=cleanup_worker, daemon=True)
cleanup_thread.start()
# Error handlers
@app.errorhandler(404)
def not_found(e):
return jsonify({'error': 'Not found'}), 404
@app.errorhandler(500)
def internal_error(e):
logger.error(f"Internal server error: {str(e)}")
return jsonify({'error': 'Internal server error'}), 500
@app.route("/rss")
def rss_proxy():
url = "https://rattatwinko.servecounterstrike.com/gitea/rattatwinko/bytechat-desktop/releases.rss"
resp = requests.get(url)
resp.raise_for_status()
root = ET.fromstring(resp.text)
items = []
for item in root.findall("./channel/item"):
items.append({
"title": item.findtext("title"),
"link": item.findtext("link"),
"pubDate": item.findtext("pubDate"),
"author": item.findtext("author"),
"description": item.findtext("description"),
})
return jsonify(items)
if __name__ == "__main__":
try:
start_cleanup_task()
socketio.run(app, debug=True, host='0.0.0.0', port=5000, allow_unsafe_werkzeug=True)
except BrokenPipeError:
# Suppress noisy broken pipe errors (client disconnects)
import sys
print("Broken pipe error suppressed.", file=sys.stderr)