This commit is contained in:
2025-11-16 18:01:30 +01:00
commit 858003cb0b
26 changed files with 4712 additions and 0 deletions

2
modules/__init__.py Normal file
View File

@@ -0,0 +1,2 @@
"""Modules package for pydoc translation service."""

252
modules/cache.py Normal file
View File

@@ -0,0 +1,252 @@
"""
Caching module with extensible backend support.
To add a new cache backend:
1. Create a class that inherits from CacheBackend
2. Implement get(), set(), and clear() methods
3. Register it in the CacheService class
"""
from abc import ABC, abstractmethod
from typing import Optional
import json
import hashlib
import sqlite3
import os
from datetime import datetime, timedelta
class CacheBackend(ABC):
"""Abstract base class for cache backends."""
@abstractmethod
def get(self, key: str) -> Optional[str]:
"""Get value from cache."""
pass
@abstractmethod
def set(self, key: str, value: str, ttl: Optional[int] = None) -> bool:
"""Set value in cache with optional TTL (time to live in seconds)."""
pass
@abstractmethod
def clear(self) -> bool:
"""Clear all cache entries."""
pass
class InMemoryCache(CacheBackend):
"""
Simple in-memory cache backend.
Stores data in a dictionary. Data is lost on application restart.
"""
def __init__(self):
self._cache = {}
self._ttl = {} # Store expiration times
def get(self, key: str) -> Optional[str]:
"""Get value from in-memory cache."""
if key in self._cache:
# Check TTL
if key in self._ttl:
if datetime.now() > self._ttl[key]:
del self._cache[key]
del self._ttl[key]
return None
return self._cache[key]
return None
def set(self, key: str, value: str, ttl: Optional[int] = None) -> bool:
"""Set value in in-memory cache."""
self._cache[key] = value
if ttl:
self._ttl[key] = datetime.now() + timedelta(seconds=ttl)
return True
def clear(self) -> bool:
"""Clear all cache entries."""
self._cache.clear()
self._ttl.clear()
return True
class SQLiteCache(CacheBackend):
"""
SQLite-based cache backend.
Persists data to a SQLite database file.
"""
def __init__(self, db_path: str = "cache.db"):
self.db_path = db_path
self._init_db()
def _init_db(self):
"""Initialize the SQLite database."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS cache (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
expires_at REAL,
created_at REAL NOT NULL
)
''')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_expires ON cache(expires_at)')
conn.commit()
conn.close()
def _cleanup_expired(self):
"""Remove expired entries."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('DELETE FROM cache WHERE expires_at IS NOT NULL AND expires_at < ?',
(datetime.now().timestamp(),))
conn.commit()
conn.close()
def get(self, key: str) -> Optional[str]:
"""Get value from SQLite cache."""
self._cleanup_expired()
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('SELECT value FROM cache WHERE key = ?', (key,))
result = cursor.fetchone()
conn.close()
return result[0] if result else None
def set(self, key: str, value: str, ttl: Optional[int] = None) -> bool:
"""Set value in SQLite cache."""
expires_at = None
if ttl:
expires_at = (datetime.now() + timedelta(seconds=ttl)).timestamp()
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO cache (key, value, expires_at, created_at)
VALUES (?, ?, ?, ?)
''', (key, value, expires_at, datetime.now().timestamp()))
conn.commit()
conn.close()
return True
def clear(self) -> bool:
"""Clear all cache entries."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('DELETE FROM cache')
conn.commit()
conn.close()
return True
class RedisCache(CacheBackend):
"""
Redis-based cache backend.
Requires redis library and REDIS_URL environment variable.
"""
def __init__(self, redis_url: Optional[str] = None):
try:
import redis
self.redis_url = redis_url or os.getenv('REDIS_URL', 'redis://localhost:6379/0')
self.client = redis.from_url(self.redis_url, decode_responses=True)
# Test connection
self.client.ping()
except ImportError:
raise ImportError("redis library not installed. Install with: pip install redis")
except Exception as e:
raise ConnectionError(f"Failed to connect to Redis: {e}")
def get(self, key: str) -> Optional[str]:
"""Get value from Redis cache."""
try:
return self.client.get(key)
except Exception as e:
print(f"Redis get error: {e}")
return None
def set(self, key: str, value: str, ttl: Optional[int] = None) -> bool:
"""Set value in Redis cache."""
try:
if ttl:
self.client.setex(key, ttl, value)
else:
self.client.set(key, value)
return True
except Exception as e:
print(f"Redis set error: {e}")
return False
def clear(self) -> bool:
"""Clear all cache entries."""
try:
self.client.flushdb()
return True
except Exception as e:
print(f"Redis clear error: {e}")
return False
class CacheService:
"""
Cache service that manages cache backends.
Provides a unified interface for caching translated documentation.
"""
def __init__(self, backend: Optional[str] = None, **kwargs):
"""
Initialize cache service.
Args:
backend: Backend name ('memory', 'sqlite', 'redis'). Defaults to 'sqlite'.
**kwargs: Additional arguments for backend initialization.
"""
self.backend_name = backend or os.getenv('CACHE_BACKEND', 'sqlite')
self.backend = self._create_backend(self.backend_name, **kwargs)
def _create_backend(self, backend_name: str, **kwargs) -> CacheBackend:
"""Create a cache backend instance."""
backends = {
'memory': InMemoryCache,
'sqlite': SQLiteCache,
'redis': RedisCache,
}
backend_class = backends.get(backend_name.lower())
if not backend_class:
raise ValueError(f"Unknown cache backend: {backend_name}")
return backend_class(**kwargs)
def _make_key(self, object_name: str, target_lang: str) -> str:
"""Generate a cache key from object name and target language."""
key_string = f"{object_name}:{target_lang}"
return hashlib.md5(key_string.encode()).hexdigest()
def get(self, object_name: str, target_lang: str) -> Optional[dict]:
"""Get cached translation."""
key = self._make_key(object_name, target_lang)
value = self.backend.get(key)
if value:
try:
return json.loads(value)
except json.JSONDecodeError:
return None
return None
def set(self, object_name: str, target_lang: str, data: dict, ttl: Optional[int] = None) -> bool:
"""Cache a translation."""
key = self._make_key(object_name, target_lang)
value = json.dumps(data)
return self.backend.set(key, value, ttl)
def clear(self) -> bool:
"""Clear all cache entries."""
return self.backend.clear()

228
modules/course_scraper.py Normal file
View File

@@ -0,0 +1,228 @@
"""
Module for scraping and organizing Python course content.
"""
import requests
from bs4 import BeautifulSoup
from typing import Dict, List, Optional
import re
class CourseScraper:
"""Scrapes Python course content from external sources."""
@staticmethod
def scrape_course_content(url: str = None) -> Dict[str, any]:
"""
Scrape course content from URLs.
Args:
url: Base URL to scrape (optional, will use default if not provided)
Returns:
Dictionary with course structure and content
"""
course_data = {
'title': 'Python Kurs - Gymnasium Hartberg',
'sections': [],
'navigation': []
}
# List of course pages to scrape
course_pages = [
('https://benschi11.github.io/python/class5.html', '5. Klasse'),
('https://benschi11.github.io/python/', 'Overview')
]
for page_url, section_title in course_pages:
try:
response = requests.get(page_url, timeout=10, headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
response.raise_for_status()
soup = BeautifulSoup(response.content, 'html.parser')
# Find main content
main_content = soup.find('main', {'id': 'content'}) or soup.find('main') or soup.find('body')
if not main_content:
continue
# Extract markdown content from main content
markdown_content = []
current_section = None
current_subsection = None
section_id = None
# Process all elements in order to build markdown
for elem in main_content.find_all(['h1', 'h2', 'h3', 'h4', 'p', 'ul', 'li', 'div', 'pre', 'code']):
tag_name = elem.name
# Handle headings
if tag_name in ['h1', 'h2']:
# Save previous section
if current_section and markdown_content:
current_section['markdown'] = '\n\n'.join(markdown_content)
course_data['sections'].append(current_section)
course_data['navigation'].append({
'title': current_section['title'],
'level': current_section['level'],
'id': section_id
})
# Start new section
text = elem.get_text().strip()
if text and not text.startswith('Python Kurs'):
section_id = CourseScraper._slugify(text)
current_section = {
'title': text,
'level': int(tag_name[1]),
'markdown': '',
'id': section_id
}
markdown_content = [f"{'#' * int(tag_name[1])} {text}"]
current_subsection = None
elif tag_name == 'h3' and current_section:
# Subsection heading
text = elem.get_text().strip()
if text:
markdown_content.append(f"\n### {text}")
current_subsection = text
elif tag_name == 'h4' and current_section:
# Sub-subsection heading
text = elem.get_text().strip()
if text:
markdown_content.append(f"\n#### {text}")
elif current_section and tag_name == 'p':
# Paragraph
text = elem.get_text().strip()
if text and len(text) > 5:
markdown_content.append(text)
elif current_section and tag_name == 'ul':
# Unordered list
list_items = []
for li in elem.find_all('li', recursive=False):
li_text = li.get_text().strip()
if li_text:
list_items.append(f"- {li_text}")
if list_items:
markdown_content.append('\n'.join(list_items))
elif current_section and tag_name == 'div':
# Check for code blocks
code_elem = elem.find('code', class_=lambda x: x and 'language-python' in ' '.join(x))
if code_elem:
code_text = code_elem.get_text().strip()
if code_text:
markdown_content.append(f"```python\n{code_text}\n```")
elif 'language-python' in str(elem.get('class', [])):
code_text = elem.get_text().strip()
if code_text:
markdown_content.append(f"```python\n{code_text}\n```")
elif current_section and tag_name == 'pre':
# Preformatted code
code_elem = elem.find('code')
if code_elem:
code_text = code_elem.get_text().strip()
if code_text:
# Check if it's Python code
lang = 'python'
if 'language-python' in str(code_elem.get('class', [])):
lang = 'python'
markdown_content.append(f"```{lang}\n{code_text}\n```")
# Save last section
if current_section:
if markdown_content:
current_section['markdown'] = '\n\n'.join(markdown_content)
course_data['sections'].append(current_section)
course_data['navigation'].append({
'title': current_section['title'],
'level': current_section['level'],
'id': section_id
})
except Exception as e:
print(f"Error scraping {page_url}: {e}")
continue
# If no content was scraped, return default
if not course_data['sections']:
return CourseScraper._get_default_course()
return course_data
@staticmethod
def _slugify(text: str) -> str:
"""Convert text to URL-friendly slug."""
text = text.lower()
text = re.sub(r'[^\w\s-]', '', text)
text = re.sub(r'[-\s]+', '-', text)
return text.strip('-')
@staticmethod
def _get_default_course() -> Dict[str, any]:
"""Get default course structure when scraping fails."""
return {
'title': 'Python Kurs - Gymnasium Hartberg',
'sections': [
{
'title': '5. Klasse',
'level': 2,
'content': [
'Grundlagen der Programmierung mit Python',
'Variablen und Datentypen',
'Eingabe und Ausgabe',
'Bedingte Anweisungen',
'Schleifen',
'Listen und Dictionaries'
],
'subsections': []
},
{
'title': '6. Klasse',
'level': 2,
'content': [
'Funktionen',
'Module und Pakete',
'Dateiverarbeitung',
'Fehlerbehandlung',
'Objektorientierte Programmierung'
],
'subsections': []
},
{
'title': 'Objektorientierte Programmierung',
'level': 2,
'content': [
'Klassen und Objekte',
'Vererbung',
'Polymorphismus',
'Abstrakte Klassen'
],
'subsections': []
},
{
'title': 'Grafische Oberflächen',
'level': 2,
'content': [
'Einführung in GUI-Programmierung',
'Tkinter Grundlagen',
'Event-Handling',
'Layout-Management'
],
'subsections': []
}
],
'navigation': [
{'title': '5. Klasse', 'level': 2, 'id': '5-klasse'},
{'title': '6. Klasse', 'level': 2, 'id': '6-klasse'},
{'title': 'Objektorientierte Programmierung', 'level': 2, 'id': 'objektorientierte-programmierung'},
{'title': 'Grafische Oberflächen', 'level': 2, 'id': 'grafische-oberflaechen'}
]
}

242
modules/doc_extractor.py Normal file
View File

@@ -0,0 +1,242 @@
"""
Module for extracting documentation from Python objects using pydoc and inspect.
"""
import pydoc
import inspect
from typing import Optional, Dict, Any
class DocExtractor:
"""
Extracts documentation from Python objects.
Supports:
- Modules
- Classes
- Functions
- Methods
- Builtins
- Any object accessible through pydoc
"""
@staticmethod
def extract_doc(object_name: str) -> Dict[str, Any]:
"""
Extract documentation for a Python object.
Args:
object_name: Dot-separated path to the object (e.g., 'dict.update', 'os.path', 'builtins.BaseException')
Returns:
Dictionary containing:
- 'original': Original English documentation
- 'object_name': Name of the object
- 'object_type': Type of object (module, class, function, etc.)
- 'signature': Function/method signature if applicable
- 'error': Error message if extraction failed
"""
try:
obj = None
resolved_name = object_name
# For builtins, resolve directly first (pydoc.resolve can be unreliable)
if object_name.startswith('builtins.'):
try:
import builtins
name = object_name.replace('builtins.', '', 1)
if hasattr(builtins, name):
obj = getattr(builtins, name)
# Verify we got the right object
obj_name = getattr(obj, '__name__', None)
if obj_name == name:
resolved_name = object_name
else:
obj = None # Wrong object, try other methods
except Exception:
pass
# If not a builtin or builtin resolution failed, try direct import first
# This is more reliable than pydoc.resolve for standard library modules
if obj is None:
try:
parts = object_name.split('.')
if len(parts) == 1:
# Simple module name (e.g., 'asyncio')
obj = __import__(object_name)
# Verify it's actually a module
if not inspect.ismodule(obj):
obj = None
elif len(parts) > 1:
# Dotted name (e.g., 'os.path', 'collections.abc')
module_name = '.'.join(parts[:-1])
attr_name = parts[-1]
module = __import__(module_name, fromlist=[attr_name])
obj = getattr(module, attr_name)
resolved_name = object_name
except Exception:
pass
# If direct import failed, try pydoc.resolve as fallback
if obj is None:
try:
resolved_obj = pydoc.resolve(object_name)
# Verify the resolved object is correct
obj = resolved_obj
except (ImportError, AttributeError, ValueError) as e:
pass
if obj is None:
raise ValueError(f"Could not resolve object: {object_name}")
# Verify we got the right object by checking its name and type
# This helps catch cases where pydoc.resolve returns wrong object
try:
parts = object_name.split('.')
expected_name = parts[-1]
actual_name = getattr(obj, '__name__', None) or getattr(obj, '__qualname__', None)
# For modules, check module name
if inspect.ismodule(obj):
module_name = getattr(obj, '__name__', '')
if module_name != object_name and not module_name.endswith('.' + object_name):
# Wrong module - try direct import
try:
correct_obj = __import__(object_name)
if inspect.ismodule(correct_obj) and getattr(correct_obj, '__name__', '') == object_name:
obj = correct_obj
except Exception:
pass
# For non-modules, verify the name matches
elif actual_name and actual_name != expected_name:
# Object name doesn't match - try to get it more directly
if len(parts) == 2 and parts[0] == 'builtins':
import builtins
if hasattr(builtins, parts[1]):
new_obj = getattr(builtins, parts[1])
new_name = getattr(new_obj, '__name__', None)
if new_name == expected_name:
obj = new_obj
elif len(parts) > 1:
# Try direct import for standard library
try:
module_name = '.'.join(parts[:-1])
attr_name = parts[-1]
module = __import__(module_name, fromlist=[attr_name])
new_obj = getattr(module, attr_name)
new_name = getattr(new_obj, '__name__', None) or getattr(new_obj, '__qualname__', None)
if new_name == expected_name or new_name == attr_name:
obj = new_obj
except Exception:
pass
except Exception:
pass # Continue even if verification fails
# Get the docstring
docstring = inspect.getdoc(obj) or pydoc.getdoc(obj) or ""
# Additional verification: check if docstring matches tuple (common wrong result)
# This catches cases where pydoc.resolve returns tuple instead of the requested object
if docstring and "Built-in immutable sequence" in docstring and "tuple" in docstring.lower():
# This looks like tuple documentation - verify we didn't request tuple
if object_name.lower() != 'tuple' and not object_name.lower().endswith('.tuple'):
# We got tuple docs but didn't ask for tuple - this is wrong!
# Try to get the correct object
try:
parts = object_name.split('.')
if len(parts) == 1:
# Simple module - try direct import
correct_obj = __import__(object_name)
if inspect.ismodule(correct_obj):
correct_doc = inspect.getdoc(correct_obj) or pydoc.getdoc(correct_obj) or ""
# If the correct doc doesn't mention tuple, use it
if "tuple" not in correct_doc.lower() or "Built-in immutable sequence" not in correct_doc:
obj = correct_obj
docstring = correct_doc
elif len(parts) > 1:
# Dotted name - try direct import
module_name = '.'.join(parts[:-1])
attr_name = parts[-1]
module = __import__(module_name, fromlist=[attr_name])
correct_obj = getattr(module, attr_name)
correct_doc = inspect.getdoc(correct_obj) or pydoc.getdoc(correct_obj) or ""
# If the correct doc doesn't mention tuple, use it
if "tuple" not in correct_doc.lower() or "Built-in immutable sequence" not in correct_doc:
obj = correct_obj
docstring = correct_doc
except Exception:
pass # If correction fails, continue with what we have
# Determine object type
if inspect.ismodule(obj):
obj_type = "module"
elif inspect.isclass(obj):
obj_type = "class"
elif inspect.isfunction(obj) or inspect.ismethod(obj):
obj_type = "function"
else:
obj_type = "object"
# Get signature if it's a callable
signature = None
if inspect.isclass(obj) or inspect.isfunction(obj) or inspect.ismethod(obj):
try:
sig = inspect.signature(obj)
signature = str(sig)
except (ValueError, TypeError):
pass
# If docstring is empty, try to get help text
if not docstring:
try:
help_text = pydoc.render_doc(obj, renderer=pydoc.plaintext)
# Extract just the docstring part (first paragraph after object name)
lines = help_text.split('\n')
# Skip empty lines and find the actual docstring
start_idx = 0
for i, line in enumerate(lines):
if line.strip() and not line.strip().startswith(object_name.split('.')[-1]):
start_idx = i
break
docstring = '\n'.join(lines[start_idx:]).strip()
except Exception:
pass
# Final fallback: use help() output
if not docstring:
try:
import io
import sys
help_output = io.StringIO()
sys.stdout = help_output
help(obj)
sys.stdout = sys.__stdout__
help_text = help_output.getvalue()
# Extract meaningful parts
lines = help_text.split('\n')
docstring = '\n'.join([l for l in lines if l.strip() and not l.strip().startswith('Help on')])[:500]
except Exception:
pass
return {
'original': docstring,
'object_name': resolved_name, # Use resolved name, not original
'object_type': obj_type,
'signature': signature,
'error': None
}
except Exception as e:
import traceback
error_msg = str(e)
# Don't expose full traceback to user, but log it
print(f"Error extracting doc for {object_name}: {error_msg}")
print(traceback.format_exc())
return {
'original': None,
'object_name': object_name,
'object_type': None,
'signature': None,
'error': f"Could not extract documentation: {error_msg}"
}

152
modules/module_list.py Normal file
View File

@@ -0,0 +1,152 @@
"""
Module for listing available Python modules and objects.
"""
import pydoc
import inspect
import sys
from typing import List, Dict, Any
class ModuleList:
"""
Provides lists of available Python modules and objects for documentation.
"""
@staticmethod
def get_standard_modules() -> List[Dict[str, Any]]:
"""
Get list of standard library modules.
Returns:
List of dictionaries with module information
"""
modules = []
stdlib_paths = [p for p in sys.path if 'site-packages' not in p]
# Common standard library modules
common_modules = [
'os', 'sys', 'json', 'datetime', 'collections', 'itertools',
'functools', 'operator', 'string', 're', 'math', 'random',
'statistics', 'decimal', 'fractions', 'array', 'bisect',
'heapq', 'copy', 'pickle', 'sqlite3', 'hashlib', 'hmac',
'secrets', 'uuid', 'pathlib', 'shutil', 'glob', 'fnmatch',
'linecache', 'tempfile', 'fileinput', 'csv', 'configparser',
'netrc', 'xdrlib', 'plistlib', 'codecs', 'unicodedata',
'stringprep', 'readline', 'rlcompleter', 'struct', 'codecs',
'types', 'copyreg', 'pprint', 'reprlib', 'enum', 'numbers',
'collections.abc', 'io', 'argparse', 'getopt', 'logging',
'getpass', 'curses', 'platform', 'errno', 'ctypes', 'threading',
'multiprocessing', 'concurrent', 'subprocess', 'sched', 'queue',
'select', 'selectors', 'asyncio', 'socket', 'ssl', 'email',
'html', 'http', 'urllib', 'xml', 'webbrowser', 'cgi', 'cgitb',
'wsgiref', 'urllib', 'xmlrpc', 'ipaddress', 'audioop', 'aifc',
'sunau', 'wave', 'chunk', 'colorsys', 'imghdr', 'sndhdr',
'ossaudiodev', 'gettext', 'locale', 'calendar', 'cmd', 'shlex',
'tkinter', 'turtle', 'pydoc', 'doctest', 'unittest', 'test',
'lib2to3', 'typing', 'dataclasses', 'contextlib', 'abc',
'atexit', 'traceback', 'future', 'gc', 'inspect', 'site',
'fpectl', 'distutils', 'ensurepip', 'venv', 'zipapp', 'faulthandler',
'pdb', 'profile', 'pstats', 'timeit', 'trace', 'tracemalloc',
'warnings', 'contextvars', 'dataclasses', 'weakref', 'types',
'copy', 'pprint', 'reprlib', 'enum', 'numbers', 'collections.abc'
]
for mod_name in common_modules:
try:
if mod_name in sys.modules:
mod = sys.modules[mod_name]
else:
mod = __import__(mod_name)
if inspect.ismodule(mod):
modules.append({
'name': mod_name,
'type': 'module',
'doc': inspect.getdoc(mod) or ''
})
except (ImportError, AttributeError):
continue
return sorted(modules, key=lambda x: x['name'])
@staticmethod
def get_builtin_objects() -> List[Dict[str, Any]]:
"""
Get list of builtin objects (types, functions, etc.).
Returns:
List of dictionaries with builtin object information
"""
objects = []
builtins_module = __import__('builtins')
for name in dir(builtins_module):
if not name.startswith('_'):
try:
obj = getattr(builtins_module, name)
obj_type = 'function' if inspect.isbuiltin(obj) or inspect.isfunction(obj) else 'type'
objects.append({
'name': name,
'type': obj_type,
'full_name': f'builtins.{name}',
'doc': ''
})
except Exception:
continue
return sorted(objects, key=lambda x: x['name'])
@staticmethod
def get_module_contents(module_name: str) -> List[Dict[str, Any]]:
"""
Get list of objects in a module.
Args:
module_name: Name of the module
Returns:
List of dictionaries with object information
"""
objects = []
try:
if module_name in sys.modules:
mod = sys.modules[module_name]
else:
mod = __import__(module_name)
if not inspect.ismodule(mod):
return objects
for name in dir(mod):
if name.startswith('_'):
continue
try:
obj = getattr(mod, name)
obj_type = 'unknown'
if inspect.ismodule(obj):
obj_type = 'module'
elif inspect.isclass(obj):
obj_type = 'class'
elif inspect.isfunction(obj) or inspect.ismethod(obj):
obj_type = 'function'
elif inspect.isbuiltin(obj):
obj_type = 'function'
else:
obj_type = 'object'
full_name = f"{module_name}.{name}"
objects.append({
'name': name,
'type': obj_type,
'full_name': full_name,
'doc': ''
})
except Exception:
continue
except Exception as e:
print(f"Error getting module contents for {module_name}: {e}")
return sorted(objects, key=lambda x: x['name'])

288
modules/translator.py Normal file
View File

@@ -0,0 +1,288 @@
"""
Translation module with extensible backend support.
To add a new translation provider:
1. Create a class that inherits from TranslationBackend
2. Implement the translate() method
3. Register it in the TranslationService class
"""
from abc import ABC, abstractmethod
from typing import Optional
import os
class TranslationBackend(ABC):
"""Abstract base class for translation backends."""
@abstractmethod
def translate(self, text: str, target_lang: str, source_lang: str = "en") -> Optional[str]:
"""
Translate text from source language to target language.
Args:
text: Text to translate
target_lang: Target language code (e.g., 'de', 'fr', 'es')
source_lang: Source language code (default: 'en')
Returns:
Translated text, or None if translation fails
"""
pass
class HuggingFaceTranslator(TranslationBackend):
"""
Translation backend using HuggingFace transformers.
Uses Helsinki-NLP models for translation.
"""
def __init__(self):
self._model = None
self._tokenizer = None
self._model_name = None
self._device = 'cpu' # Default device
def _load_model(self, target_lang: str):
"""Lazy load the translation model."""
try:
from transformers import MarianMTModel, MarianTokenizer
import torch
except ImportError as e:
raise ImportError(
"transformers library not installed. "
"Install with: pip install transformers torch"
) from e
try:
# Check for SentencePiece (required by MarianTokenizer)
import sentencepiece
except ImportError:
raise ImportError(
"SentencePiece library not installed. "
"Install with: pip install sentencepiece"
)
# Map language codes to model names
model_map = {
'de': 'Helsinki-NLP/opus-mt-en-de',
'fr': 'Helsinki-NLP/opus-mt-en-fr',
'es': 'Helsinki-NLP/opus-mt-en-es',
'it': 'Helsinki-NLP/opus-mt-en-it',
'pt': 'Helsinki-NLP/opus-mt-en-pt',
'ru': 'Helsinki-NLP/opus-mt-en-ru',
}
model_name = model_map.get(target_lang)
if not model_name:
raise ValueError(f"No model available for language: {target_lang}")
# Only reload if language changed
if self._model_name != model_name:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
# Load tokenizer first (doesn't need device)
self._tokenizer = MarianTokenizer.from_pretrained(model_name)
# Load model - try to load directly to device to avoid meta tensor issues
try:
# For CPU, load normally
if device == 'cpu':
self._model = MarianMTModel.from_pretrained(model_name)
self._model.eval()
else:
# For CUDA, try loading with device_map or load then move
try:
# Try loading with device_map if supported
self._model = MarianMTModel.from_pretrained(
model_name,
device_map='auto'
)
self._model.eval()
# Update device based on where model actually ended up
actual_device = next(self._model.parameters()).device.type
device = actual_device if actual_device in ['cuda', 'cpu'] else 'cpu'
except (TypeError, ValueError):
# Fallback: load to CPU first, then move
self._model = MarianMTModel.from_pretrained(model_name)
self._model.eval()
try:
self._model = self._model.to(device)
except Exception:
# If moving fails, keep on CPU
device = 'cpu'
except Exception as e:
# Ultimate fallback: load to CPU
print(f"Warning: Error loading model to {device}, using CPU: {e}")
self._model = MarianMTModel.from_pretrained(model_name)
self._model.eval()
device = 'cpu'
self._model_name = model_name
self._device = device
def translate(self, text: str, target_lang: str, source_lang: str = "en") -> Optional[str]:
"""Translate using HuggingFace model."""
if not text:
return ""
try:
self._load_model(target_lang)
import torch
# Split text into paragraphs first, then sentences
paragraphs = text.split('\n\n')
translated_paragraphs = []
for para in paragraphs:
if not para.strip():
translated_paragraphs.append(para)
continue
# Split into sentences (simple approach)
sentences = para.split('\n')
translated_sentences = []
for sentence in sentences:
if not sentence.strip():
translated_sentences.append(sentence)
continue
try:
# Tokenize and move to device
inputs = self._tokenizer(
[sentence],
return_tensors="pt",
padding=True,
truncation=True,
max_length=512
).to(self._device)
# Generate translation
with torch.no_grad():
translated = self._model.generate(**inputs, max_length=512)
translated_text = self._tokenizer.decode(translated[0], skip_special_tokens=True)
translated_sentences.append(translated_text)
except Exception as e:
print(f"Error translating sentence: {e}")
translated_sentences.append(sentence) # Fallback to original
translated_paragraphs.append('\n'.join(translated_sentences))
return '\n\n'.join(translated_paragraphs)
except Exception as e:
import traceback
print(f"Translation error: {e}")
print(traceback.format_exc())
return None
class GoogleTranslateBackend(TranslationBackend):
"""
Translation backend using Google Translate API.
Requires GOOGLE_TRANSLATE_API_KEY environment variable.
"""
def __init__(self):
self.api_key = os.getenv('GOOGLE_TRANSLATE_API_KEY')
if not self.api_key:
raise ValueError("GOOGLE_TRANSLATE_API_KEY environment variable not set")
def translate(self, text: str, target_lang: str, source_lang: str = "en") -> Optional[str]:
"""Translate using Google Translate API."""
try:
from googletrans import Translator
translator = Translator()
result = translator.translate(text, dest=target_lang, src=source_lang)
return result.text
except ImportError:
raise ImportError("googletrans library not installed. Install with: pip install googletrans==4.0.0rc1")
except Exception as e:
print(f"Google Translate error: {e}")
return None
class DeepLTranslator(TranslationBackend):
"""
Translation backend using DeepL API.
Requires DEEPL_API_KEY environment variable.
"""
def __init__(self):
self.api_key = os.getenv('DEEPL_API_KEY')
if not self.api_key:
raise ValueError("DEEPL_API_KEY environment variable not set")
def translate(self, text: str, target_lang: str, source_lang: str = "en") -> Optional[str]:
"""Translate using DeepL API."""
try:
import deepl
translator = deepl.Translator(self.api_key)
result = translator.translate_text(text, target_lang=target_lang.upper(), source_lang=source_lang.upper())
return result.text
except ImportError:
raise ImportError("deepl library not installed. Install with: pip install deepl")
except Exception as e:
print(f"DeepL translation error: {e}")
return None
class TranslationService:
"""
Translation service that manages multiple translation backends.
Automatically selects the best available backend based on configuration.
"""
def __init__(self, backend: Optional[str] = None):
"""
Initialize translation service.
Args:
backend: Backend name ('huggingface', 'google', 'deepl').
If None, auto-selects based on availability.
"""
self.backend_name = backend or self._auto_select_backend()
self.backend = self._create_backend(self.backend_name)
def _auto_select_backend(self) -> str:
"""Auto-select the best available backend."""
# Priority: DeepL > Google > HuggingFace
if os.getenv('DEEPL_API_KEY'):
return 'deepl'
elif os.getenv('GOOGLE_TRANSLATE_API_KEY'):
return 'google'
else:
return 'huggingface' # Default to local model
def _create_backend(self, backend_name: str) -> TranslationBackend:
"""Create a translation backend instance."""
backends = {
'huggingface': HuggingFaceTranslator,
'google': GoogleTranslateBackend,
'deepl': DeepLTranslator,
}
backend_class = backends.get(backend_name.lower())
if not backend_class:
raise ValueError(f"Unknown backend: {backend_name}")
try:
return backend_class()
except Exception as e:
# Fallback to HuggingFace if other backends fail
if backend_name != 'huggingface':
print(f"Failed to initialize {backend_name}, falling back to HuggingFace: {e}")
return HuggingFaceTranslator()
raise
def translate(self, text: str, target_lang: str, source_lang: str = "en") -> Optional[str]:
"""Translate text using the configured backend."""
if not text:
return ""
return self.backend.translate(text, target_lang, source_lang)