2403 lines
87 KiB
Python
2403 lines
87 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Bytecode VM with Java/C++-like syntax
|
|
Includes disassembler and debugger with improved type safety and error handling
|
|
"""
|
|
|
|
import struct
|
|
import sys
|
|
import os
|
|
from enum import IntEnum
|
|
from dataclasses import dataclass
|
|
from typing import List, Optional, Any, Dict, Tuple, Union
|
|
import re
|
|
import traceback
|
|
from collections import defaultdict
|
|
|
|
# ============================================================================
|
|
# TOKEN DEFINITIONS
|
|
# ============================================================================
|
|
|
|
class TokenType(IntEnum):
|
|
# Keywords
|
|
FUN = 0
|
|
IF = 1
|
|
ELSE = 2
|
|
WHILE = 3
|
|
FOR = 4
|
|
RETURN = 5
|
|
UINT = 6
|
|
INT = 7
|
|
FLOAT = 8
|
|
BOOL = 9
|
|
CHAR = 10
|
|
STR = 11
|
|
TRUE = 12
|
|
FALSE = 13
|
|
U8 = 14
|
|
U16 = 15
|
|
U32 = 16
|
|
I8 = 17
|
|
I16 = 18
|
|
I32 = 19
|
|
|
|
# Identifiers and literals
|
|
IDENTIFIER = 20
|
|
INT_LITERAL = 21
|
|
FLOAT_LITERAL = 22
|
|
STRING_LITERAL = 23
|
|
BOOL_LITERAL = 24
|
|
|
|
# Operators
|
|
PLUS = 30
|
|
MINUS = 31
|
|
STAR = 32
|
|
SLASH = 33
|
|
PERCENT = 34
|
|
ASSIGN = 35
|
|
PLUS_ASSIGN = 36
|
|
MINUS_ASSIGN = 37
|
|
STAR_ASSIGN = 38
|
|
SLASH_ASSIGN = 39
|
|
EQ = 40
|
|
NEQ = 41
|
|
LT = 42
|
|
GT = 43
|
|
LE = 44
|
|
GE = 45
|
|
AND = 46
|
|
OR = 47
|
|
NOT = 48
|
|
INCREMENT = 49
|
|
DECREMENT = 50
|
|
|
|
# Delimiters
|
|
LPAREN = 60
|
|
RPAREN = 61
|
|
LBRACE = 62
|
|
RBRACE = 63
|
|
SEMICOLON = 64
|
|
COMMA = 65
|
|
|
|
# End of file
|
|
EOF = 99
|
|
|
|
@dataclass
|
|
class Token:
|
|
type: TokenType
|
|
value: Any
|
|
line: int
|
|
col: int
|
|
filename: str = ""
|
|
|
|
def __str__(self):
|
|
return f"Token({self.type.name}, {repr(self.value)}, line {self.line}, col {self.col})"
|
|
|
|
# ============================================================================
|
|
# LEXER
|
|
# ============================================================================
|
|
|
|
class Lexer:
|
|
def __init__(self, source: str, filename: str = ""):
|
|
self.source = source
|
|
self.filename = filename
|
|
self.position = 0
|
|
self.line = 1
|
|
self.col = 1
|
|
self.current_char = self.source[0] if source else None
|
|
|
|
# Keyword mapping
|
|
self.keywords = {
|
|
'fun': TokenType.FUN,
|
|
'if': TokenType.IF,
|
|
'else': TokenType.ELSE,
|
|
'while': TokenType.WHILE,
|
|
'for': TokenType.FOR,
|
|
'return': TokenType.RETURN,
|
|
'uint': TokenType.UINT,
|
|
'int': TokenType.INT,
|
|
'float': TokenType.FLOAT,
|
|
'bool': TokenType.BOOL,
|
|
'char': TokenType.CHAR,
|
|
'str': TokenType.STR,
|
|
'true': TokenType.TRUE,
|
|
'false': TokenType.FALSE,
|
|
'u8': TokenType.U8,
|
|
'u16': TokenType.U16,
|
|
'u32': TokenType.U32,
|
|
'i8': TokenType.I8,
|
|
'i16': TokenType.I16,
|
|
'i32': TokenType.I32,
|
|
}
|
|
|
|
def error(self, message: str):
|
|
"""Raise a lexer error with position information"""
|
|
raise SyntaxError(f"{self.filename}:{self.line}:{self.col}: {message}")
|
|
|
|
def advance(self):
|
|
"""Advance to the next character"""
|
|
if self.position >= len(self.source) - 1:
|
|
self.current_char = None
|
|
return
|
|
|
|
self.position += 1
|
|
if self.current_char == '\n':
|
|
self.line += 1
|
|
self.col = 1
|
|
else:
|
|
self.col += 1
|
|
self.current_char = self.source[self.position]
|
|
|
|
def skip_whitespace(self):
|
|
"""Skip whitespace and comments"""
|
|
while self.current_char is not None:
|
|
if self.current_char in ' \t\r':
|
|
self.advance()
|
|
elif self.current_char == '\n':
|
|
self.advance()
|
|
elif self.current_char == '/':
|
|
if self.position + 1 < len(self.source) and self.source[self.position + 1] == '/':
|
|
# Single line comment
|
|
while self.current_char is not None and self.current_char != '\n':
|
|
self.advance()
|
|
elif self.position + 1 < len(self.source) and self.source[self.position + 1] == '*':
|
|
# Multi-line comment
|
|
self.advance() # Skip /
|
|
self.advance() # Skip *
|
|
while (self.current_char is not None and
|
|
not (self.current_char == '*' and
|
|
self.position + 1 < len(self.source) and
|
|
self.source[self.position + 1] == '/')):
|
|
self.advance()
|
|
if self.current_char is None:
|
|
self.error("Unterminated multi-line comment")
|
|
self.advance() # Skip *
|
|
self.advance() # Skip /
|
|
else:
|
|
break
|
|
else:
|
|
break
|
|
|
|
def number(self):
|
|
"""Parse a number (integer or float)"""
|
|
start_line, start_col = self.line, self.col
|
|
result = ''
|
|
|
|
while self.current_char is not None and self.current_char.isdigit():
|
|
result += self.current_char
|
|
self.advance()
|
|
|
|
if self.current_char == '.':
|
|
result += self.current_char
|
|
self.advance()
|
|
while self.current_char is not None and self.current_char.isdigit():
|
|
result += self.current_char
|
|
self.advance()
|
|
return Token(TokenType.FLOAT_LITERAL, float(result), start_line, start_col, self.filename)
|
|
else:
|
|
return Token(TokenType.INT_LITERAL, int(result), start_line, start_col, self.filename)
|
|
|
|
def string(self):
|
|
"""Parse a string literal"""
|
|
start_line, start_col = self.line, self.col
|
|
self.advance() # Skip opening quote
|
|
result = ''
|
|
|
|
while self.current_char is not None and self.current_char != '"':
|
|
if self.current_char == '\\':
|
|
self.advance()
|
|
if self.current_char == 'n':
|
|
result += '\n'
|
|
elif self.current_char == 't':
|
|
result += '\t'
|
|
elif self.current_char == 'r':
|
|
result += '\r'
|
|
elif self.current_char == '0':
|
|
result += '\0'
|
|
elif self.current_char == '\\':
|
|
result += '\\'
|
|
elif self.current_char == '"':
|
|
result += '"'
|
|
else:
|
|
result += '\\' + self.current_char
|
|
else:
|
|
result += self.current_char
|
|
self.advance()
|
|
|
|
if self.current_char != '"':
|
|
self.error("Unterminated string literal")
|
|
|
|
self.advance() # Skip closing quote
|
|
return Token(TokenType.STRING_LITERAL, result, start_line, start_col, self.filename)
|
|
|
|
def identifier(self):
|
|
"""Parse an identifier or keyword"""
|
|
start_line, start_col = self.line, self.col
|
|
result = ''
|
|
|
|
while (self.current_char is not None and
|
|
(self.current_char.isalnum() or self.current_char == '_')):
|
|
result += self.current_char
|
|
self.advance()
|
|
|
|
# Check if it's a keyword
|
|
token_type = self.keywords.get(result, TokenType.IDENTIFIER)
|
|
|
|
# Handle boolean literals
|
|
if token_type == TokenType.TRUE:
|
|
return Token(TokenType.BOOL_LITERAL, True, start_line, start_col, self.filename)
|
|
elif token_type == TokenType.FALSE:
|
|
return Token(TokenType.BOOL_LITERAL, False, start_line, start_col, self.filename)
|
|
|
|
return Token(token_type, result, start_line, start_col, self.filename)
|
|
|
|
def next_token(self):
|
|
"""Get the next token from the source"""
|
|
self.skip_whitespace()
|
|
|
|
if self.current_char is None:
|
|
return Token(TokenType.EOF, None, self.line, self.col, self.filename)
|
|
|
|
start_line, start_col = self.line, self.col
|
|
|
|
# Single character tokens
|
|
if self.current_char == '(':
|
|
self.advance()
|
|
return Token(TokenType.LPAREN, '(', start_line, start_col, self.filename)
|
|
elif self.current_char == ')':
|
|
self.advance()
|
|
return Token(TokenType.RPAREN, ')', start_line, start_col, self.filename)
|
|
elif self.current_char == '{':
|
|
self.advance()
|
|
return Token(TokenType.LBRACE, '{', start_line, start_col, self.filename)
|
|
elif self.current_char == '}':
|
|
self.advance()
|
|
return Token(TokenType.RBRACE, '}', start_line, start_col, self.filename)
|
|
elif self.current_char == ';':
|
|
self.advance()
|
|
return Token(TokenType.SEMICOLON, ';', start_line, start_col, self.filename)
|
|
elif self.current_char == ',':
|
|
self.advance()
|
|
return Token(TokenType.COMMA, ',', start_line, start_col, self.filename)
|
|
|
|
# Multi-character operators
|
|
elif self.current_char == '+':
|
|
self.advance()
|
|
if self.current_char == '+':
|
|
self.advance()
|
|
return Token(TokenType.INCREMENT, '++', start_line, start_col, self.filename)
|
|
elif self.current_char == '=':
|
|
self.advance()
|
|
return Token(TokenType.PLUS_ASSIGN, '+=', start_line, start_col, self.filename)
|
|
else:
|
|
return Token(TokenType.PLUS, '+', start_line, start_col, self.filename)
|
|
|
|
elif self.current_char == '-':
|
|
self.advance()
|
|
if self.current_char == '-':
|
|
self.advance()
|
|
return Token(TokenType.DECREMENT, '--', start_line, start_col, self.filename)
|
|
elif self.current_char == '=':
|
|
self.advance()
|
|
return Token(TokenType.MINUS_ASSIGN, '-=', start_line, start_col, self.filename)
|
|
else:
|
|
return Token(TokenType.MINUS, '-', start_line, start_col, self.filename)
|
|
|
|
elif self.current_char == '*':
|
|
self.advance()
|
|
if self.current_char == '=':
|
|
self.advance()
|
|
return Token(TokenType.STAR_ASSIGN, '*=', start_line, start_col, self.filename)
|
|
else:
|
|
return Token(TokenType.STAR, '*', start_line, start_col, self.filename)
|
|
|
|
elif self.current_char == '/':
|
|
self.advance()
|
|
if self.current_char == '=':
|
|
self.advance()
|
|
return Token(TokenType.SLASH_ASSIGN, '/=', start_line, start_col, self.filename)
|
|
else:
|
|
return Token(TokenType.SLASH, '/', start_line, start_col, self.filename)
|
|
|
|
elif self.current_char == '%':
|
|
self.advance()
|
|
return Token(TokenType.PERCENT, '%', start_line, start_col, self.filename)
|
|
|
|
elif self.current_char == '=':
|
|
self.advance()
|
|
if self.current_char == '=':
|
|
self.advance()
|
|
return Token(TokenType.EQ, '==', start_line, start_col, self.filename)
|
|
else:
|
|
return Token(TokenType.ASSIGN, '=', start_line, start_col, self.filename)
|
|
|
|
elif self.current_char == '!':
|
|
self.advance()
|
|
if self.current_char == '=':
|
|
self.advance()
|
|
return Token(TokenType.NEQ, '!=', start_line, start_col, self.filename)
|
|
else:
|
|
return Token(TokenType.NOT, '!', start_line, start_col, self.filename)
|
|
|
|
elif self.current_char == '<':
|
|
self.advance()
|
|
if self.current_char == '=':
|
|
self.advance()
|
|
return Token(TokenType.LE, '<=', start_line, start_col, self.filename)
|
|
else:
|
|
return Token(TokenType.LT, '<', start_line, start_col, self.filename)
|
|
|
|
elif self.current_char == '>':
|
|
self.advance()
|
|
if self.current_char == '=':
|
|
self.advance()
|
|
return Token(TokenType.GE, '>=', start_line, start_col, self.filename)
|
|
else:
|
|
return Token(TokenType.GT, '>', start_line, start_col, self.filename)
|
|
|
|
elif self.current_char == '&':
|
|
self.advance()
|
|
if self.current_char == '&':
|
|
self.advance()
|
|
return Token(TokenType.AND, '&&', start_line, start_col, self.filename)
|
|
else:
|
|
self.error("Single '&' not supported, use '&&' for logical AND")
|
|
|
|
elif self.current_char == '|':
|
|
self.advance()
|
|
if self.current_char == '|':
|
|
self.advance()
|
|
return Token(TokenType.OR, '||', start_line, start_col, self.filename)
|
|
else:
|
|
self.error("Single '|' not supported, use '||' for logical OR")
|
|
|
|
# Numbers
|
|
elif self.current_char.isdigit():
|
|
return self.number()
|
|
|
|
# Strings
|
|
elif self.current_char == '"':
|
|
return self.string()
|
|
|
|
# Identifiers
|
|
elif self.current_char.isalpha() or self.current_char == '_':
|
|
return self.identifier()
|
|
|
|
else:
|
|
self.error(f"Unexpected character: '{self.current_char}'")
|
|
|
|
# ============================================================================
|
|
# OPCODES
|
|
# ============================================================================
|
|
|
|
class Opcode(IntEnum):
|
|
PUSH_CONST = 0x01
|
|
PUSH_INT = 0x02
|
|
PUSH_FLOAT = 0x03
|
|
PUSH_STR = 0x04
|
|
|
|
LOAD_LOCAL = 0x10
|
|
STORE_LOCAL = 0x11
|
|
|
|
ADD = 0x20
|
|
SUB = 0x21
|
|
MUL = 0x22
|
|
DIV = 0x23
|
|
MOD = 0x24
|
|
NEG = 0x25
|
|
BIT_AND = 0x26
|
|
BIT_OR = 0x27
|
|
BIT_XOR = 0x28
|
|
SHL = 0x29
|
|
SHR = 0x2A
|
|
|
|
FADD = 0x30
|
|
FSUB = 0x31
|
|
FMUL = 0x32
|
|
FDIV = 0x33
|
|
FNEG = 0x34
|
|
|
|
CMP_EQ = 0x40
|
|
CMP_NEQ = 0x41
|
|
CMP_LT = 0x42
|
|
CMP_GT = 0x43
|
|
CMP_LE = 0x44
|
|
CMP_GE = 0x45
|
|
|
|
JMP = 0x50
|
|
JMP_IF = 0x51
|
|
JMP_IF_NOT = 0x52
|
|
|
|
CALL = 0x60
|
|
RET = 0x61
|
|
|
|
CONST_CAST = 0x70
|
|
TRUNC = 0x71
|
|
TO_FLOAT = 0x72
|
|
TO_INT = 0x73
|
|
|
|
DUP = 0x80
|
|
POP = 0x81
|
|
|
|
PRINT = 0x90
|
|
HALT = 0xA0
|
|
|
|
# Type codes
|
|
class TypeCode(IntEnum):
|
|
I8 = 0x01
|
|
U8 = 0x02
|
|
I16 = 0x03
|
|
U16 = 0x04
|
|
I32 = 0x05
|
|
U32 = 0x06
|
|
F32 = 0x07
|
|
BOOL = 0x08
|
|
CHAR = 0x09
|
|
STR = 0x0A
|
|
|
|
# ============================================================================
|
|
# VALUE REPRESENTATION
|
|
# ============================================================================
|
|
|
|
@dataclass
|
|
class Value:
|
|
"""Runtime value container with type safety"""
|
|
type_code: TypeCode
|
|
data: Any
|
|
|
|
def __post_init__(self):
|
|
"""Validate value data matches type"""
|
|
self._validate_type()
|
|
|
|
def _validate_type(self):
|
|
"""Validate that data matches the type code"""
|
|
type_validators = {
|
|
TypeCode.I8: lambda x: isinstance(x, int) and -128 <= x <= 127,
|
|
TypeCode.U8: lambda x: isinstance(x, int) and 0 <= x <= 255,
|
|
TypeCode.I16: lambda x: isinstance(x, int) and -32768 <= x <= 32767,
|
|
TypeCode.U16: lambda x: isinstance(x, int) and 0 <= x <= 65535,
|
|
TypeCode.I32: lambda x: isinstance(x, int),
|
|
TypeCode.U32: lambda x: isinstance(x, int) and x >= 0,
|
|
TypeCode.F32: lambda x: isinstance(x, float),
|
|
TypeCode.BOOL: lambda x: isinstance(x, bool),
|
|
TypeCode.CHAR: lambda x: isinstance(x, str) and len(x) == 1,
|
|
TypeCode.STR: lambda x: isinstance(x, str),
|
|
}
|
|
|
|
validator = type_validators.get(self.type_code)
|
|
if validator and not validator(self.data):
|
|
raise TypeError(f"Value {self.data} is not valid for type {self.type_code.name}")
|
|
|
|
def to_bool(self) -> bool:
|
|
"""Convert to boolean"""
|
|
if self.type_code == TypeCode.BOOL:
|
|
return self.data
|
|
elif self.type_code in [TypeCode.I8, TypeCode.U8, TypeCode.I16, TypeCode.U16, TypeCode.I32, TypeCode.U32]:
|
|
return bool(self.data)
|
|
elif self.type_code == TypeCode.F32:
|
|
return bool(self.data)
|
|
elif self.type_code == TypeCode.CHAR:
|
|
return bool(ord(self.data))
|
|
elif self.type_code == TypeCode.STR:
|
|
return bool(self.data)
|
|
return False
|
|
|
|
def to_int(self) -> int:
|
|
"""Convert to integer"""
|
|
if self.type_code in [TypeCode.I8, TypeCode.U8, TypeCode.I16, TypeCode.U16, TypeCode.I32, TypeCode.U32]:
|
|
return int(self.data)
|
|
elif self.type_code == TypeCode.F32:
|
|
return int(self.data)
|
|
elif self.type_code == TypeCode.BOOL:
|
|
return 1 if self.data else 0
|
|
elif self.type_code == TypeCode.CHAR:
|
|
return ord(self.data)
|
|
return 0
|
|
|
|
def to_float(self) -> float:
|
|
"""Convert to float"""
|
|
return float(self.data)
|
|
|
|
def get_type_name(self) -> str:
|
|
"""Get human-readable type name"""
|
|
return self.type_code.name
|
|
|
|
def __repr__(self):
|
|
return f"Value({self.type_code.name}, {repr(self.data)})"
|
|
|
|
# ============================================================================
|
|
# DISASSEMBLER
|
|
# ============================================================================
|
|
|
|
class Disassembler:
|
|
def __init__(self, bytecode: bytes, filename: str = ""):
|
|
self.bytecode = bytecode
|
|
self.filename = filename
|
|
self.ip = 0
|
|
self.constants = []
|
|
self.functions = []
|
|
|
|
def error(self, message: str):
|
|
"""Raise a disassembler error with position information"""
|
|
raise ValueError(f"{self.filename}:0x{self.ip:08x}: {message}")
|
|
|
|
def disassemble(self) -> str:
|
|
"""Disassemble entire .popclass file"""
|
|
output = []
|
|
|
|
# Parse header
|
|
magic = self.bytecode[self.ip:self.ip+4]
|
|
if magic != b'POPC':
|
|
self.error(f"Invalid .popclass file magic: {magic}")
|
|
self.ip += 4
|
|
|
|
version_major = struct.unpack('<H', self.bytecode[self.ip:self.ip+2])[0]
|
|
self.ip += 2
|
|
version_minor = struct.unpack('<H', self.bytecode[self.ip:self.ip+2])[0]
|
|
self.ip += 2
|
|
|
|
# Reserved
|
|
self.ip += 4
|
|
|
|
output.append(f"; POP Class File Version {version_major}.{version_minor}")
|
|
output.append(f"; Magic: {magic}")
|
|
output.append("")
|
|
|
|
# Parse constants
|
|
const_count = struct.unpack('<I', self.bytecode[self.ip:self.ip+4])[0]
|
|
self.ip += 4
|
|
|
|
output.append("; Constant Pool")
|
|
output.append(f"; Count: {const_count}")
|
|
|
|
for i in range(const_count):
|
|
const_type = self.bytecode[self.ip]
|
|
self.ip += 1
|
|
|
|
if const_type == 0: # int
|
|
width = self.bytecode[self.ip]
|
|
self.ip += 1
|
|
signed = self.bytecode[self.ip]
|
|
self.ip += 1
|
|
value = struct.unpack('<i', self.bytecode[self.ip:self.ip+4])[0]
|
|
self.ip += 4
|
|
|
|
type_str = f"i{width}" if signed else f"u{width}"
|
|
self.constants.append(('int', value, type_str))
|
|
output.append(f"; const[{i}] = {type_str} {value}")
|
|
|
|
elif const_type == 1: # float
|
|
value = struct.unpack('<f', self.bytecode[self.ip:self.ip+4])[0]
|
|
self.ip += 4
|
|
self.constants.append(('float', value, 'float'))
|
|
output.append(f"; const[{i}] = float {value}")
|
|
|
|
elif const_type == 2: # str
|
|
length = struct.unpack('<I', self.bytecode[self.ip:self.ip+4])[0]
|
|
self.ip += 4
|
|
str_bytes = self.bytecode[self.ip:self.ip+length]
|
|
self.ip += length
|
|
string_value = str_bytes.decode('utf-8')
|
|
self.constants.append(('str', string_value, 'str'))
|
|
output.append(f'; const[{i}] = str "{string_value}"')
|
|
else:
|
|
self.error(f"Unknown constant type: 0x{const_type:02x}")
|
|
|
|
output.append("")
|
|
|
|
# Parse functions
|
|
func_count = struct.unpack('<I', self.bytecode[self.ip:self.ip+4])[0]
|
|
self.ip += 4
|
|
|
|
output.append("; Function Table")
|
|
output.append(f"; Count: {func_count}")
|
|
|
|
for i in range(func_count):
|
|
name_idx = struct.unpack('<I', self.bytecode[self.ip:self.ip+4])[0]
|
|
self.ip += 4
|
|
arg_count = self.bytecode[self.ip]
|
|
self.ip += 1
|
|
local_count = self.bytecode[self.ip]
|
|
self.ip += 1
|
|
self.ip += 2 # Reserved
|
|
code_size = struct.unpack('<I', self.bytecode[self.ip:self.ip+4])[0]
|
|
self.ip += 4
|
|
code_offset = struct.unpack('<I', self.bytecode[self.ip:self.ip+4])[0]
|
|
self.ip += 4
|
|
|
|
self.functions.append({
|
|
'index': i,
|
|
'name_idx': name_idx,
|
|
'arg_count': arg_count,
|
|
'local_count': local_count,
|
|
'code_size': code_size,
|
|
'code_offset': code_offset,
|
|
})
|
|
|
|
name = f"func_{i}" if name_idx == 0xFFFFFFFF else f"const[{name_idx}]"
|
|
output.append(f"; func[{i}]: {name}, args={arg_count}, locals={local_count}, "
|
|
f"code_size={code_size}, offset=0x{code_offset:08x}")
|
|
|
|
output.append("")
|
|
|
|
# Disassemble code for each function
|
|
for func in self.functions:
|
|
output.append(f"; Function {func['index']}")
|
|
output.extend(self.disassemble_function(func))
|
|
output.append("")
|
|
|
|
return '\n'.join(output)
|
|
|
|
def disassemble_function(self, func: Dict) -> List[str]:
|
|
"""Disassemble a single function"""
|
|
output = []
|
|
|
|
ip = func['code_offset']
|
|
end_ip = ip + func['code_size']
|
|
|
|
while ip < end_ip:
|
|
# Save current IP for this instruction
|
|
current_ip = ip
|
|
|
|
try:
|
|
opcode = self.bytecode[ip]
|
|
ip += 1
|
|
|
|
# Get mnemonic
|
|
mnemonic = self.get_opcode_mnemonic(opcode)
|
|
|
|
# Handle operands based on opcode
|
|
operands = []
|
|
|
|
if opcode == Opcode.PUSH_CONST:
|
|
const_idx = struct.unpack('<I', self.bytecode[ip:ip+4])[0]
|
|
ip += 4
|
|
if const_idx >= len(self.constants):
|
|
self.error(f"Constant index {const_idx} out of range")
|
|
const_type, value, type_str = self.constants[const_idx]
|
|
operands.append(f"const[{const_idx}] ; {type_str} {value}")
|
|
|
|
elif opcode == Opcode.PUSH_INT:
|
|
width = self.bytecode[ip]
|
|
ip += 1
|
|
value = struct.unpack('<i', self.bytecode[ip:ip+4])[0]
|
|
ip += 4
|
|
operands.append(f"i{width} {value}")
|
|
|
|
elif opcode == Opcode.PUSH_FLOAT:
|
|
value = struct.unpack('<f', self.bytecode[ip:ip+4])[0]
|
|
ip += 4
|
|
operands.append(f"{value}")
|
|
|
|
elif opcode == Opcode.PUSH_STR:
|
|
const_idx = struct.unpack('<I', self.bytecode[ip:ip+4])[0]
|
|
ip += 4
|
|
if const_idx >= len(self.constants):
|
|
self.error(f"Constant index {const_idx} out of range")
|
|
const_type, value, type_str = self.constants[const_idx]
|
|
operands.append(f"const[{const_idx}] ; \"{value}\"")
|
|
|
|
elif opcode in [Opcode.LOAD_LOCAL, Opcode.STORE_LOCAL]:
|
|
local_idx = struct.unpack('<H', self.bytecode[ip:ip+2])[0]
|
|
ip += 2
|
|
operands.append(f"local[{local_idx}]")
|
|
|
|
elif opcode in [Opcode.JMP, Opcode.JMP_IF, Opcode.JMP_IF_NOT]:
|
|
offset = struct.unpack('<i', self.bytecode[ip:ip+4])[0]
|
|
ip += 4
|
|
target_ip = current_ip + 5 + offset
|
|
operands.append(f"0x{target_ip:08x} ; offset={offset}")
|
|
|
|
elif opcode == Opcode.CALL:
|
|
func_idx = struct.unpack('<H', self.bytecode[ip:ip+2])[0]
|
|
ip += 2
|
|
arg_count = self.bytecode[ip]
|
|
ip += 1
|
|
if func_idx >= len(self.functions):
|
|
self.error(f"Function index {func_idx} out of range")
|
|
operands.append(f"func[{func_idx}], {arg_count}")
|
|
|
|
elif opcode == Opcode.RET:
|
|
has_value = self.bytecode[ip]
|
|
ip += 1
|
|
operands.append(f"{has_value}")
|
|
|
|
else:
|
|
# No operands for this opcode
|
|
pass
|
|
|
|
# Format the instruction
|
|
operands_str = ', '.join(operands) if operands else ''
|
|
output.append(f"0x{current_ip:08x}: {mnemonic:15} {operands_str}")
|
|
|
|
except Exception as e:
|
|
output.append(f"0x{current_ip:08x}: ERROR: {e}")
|
|
break
|
|
|
|
return output
|
|
|
|
def get_opcode_mnemonic(self, opcode: int) -> str:
|
|
"""Convert opcode to mnemonic string"""
|
|
try:
|
|
return Opcode(opcode).name
|
|
except ValueError:
|
|
return f"UNKNOWN(0x{opcode:02x})"
|
|
|
|
# ============================================================================
|
|
# DEBUGGER
|
|
# ============================================================================
|
|
|
|
class Debugger:
|
|
def __init__(self, vm):
|
|
self.vm = vm
|
|
self.breakpoints = set()
|
|
self.step_mode = False
|
|
self.last_command = None
|
|
|
|
def print_status(self):
|
|
"""Print current VM status"""
|
|
print(f"\nIP: 0x{self.vm.ip:08x} | Stack: {len(self.vm.current_frame.stack)} | "
|
|
f"Locals: {len(self.vm.current_frame.locals)} | Frame: {len(self.vm.call_stack)}")
|
|
|
|
# Show current instruction
|
|
if self.vm.ip < len(self.vm.bytecode):
|
|
opcode = self.vm.bytecode[self.vm.ip]
|
|
try:
|
|
print(f"Next: {Opcode(opcode).name}")
|
|
except:
|
|
print(f"Next: UNKNOWN(0x{opcode:02x})")
|
|
|
|
def print_stack(self):
|
|
"""Print current stack contents"""
|
|
if not self.vm.current_frame.stack:
|
|
print("Stack: empty")
|
|
return
|
|
|
|
print("Stack (top to bottom):")
|
|
for i, value in enumerate(reversed(self.vm.current_frame.stack)):
|
|
print(f" [{len(self.vm.current_frame.stack)-i-1}]: {value}")
|
|
|
|
def print_locals(self):
|
|
"""Print current local variables"""
|
|
if not self.vm.current_frame.locals:
|
|
print("Locals: none")
|
|
return
|
|
|
|
print("Local variables:")
|
|
for i, value in enumerate(self.vm.current_frame.locals):
|
|
print(f" [{i}]: {value}")
|
|
|
|
def print_breakpoints(self):
|
|
"""Print all breakpoints"""
|
|
if not self.breakpoints:
|
|
print("No breakpoints set")
|
|
return
|
|
|
|
print("Breakpoints:")
|
|
for bp in sorted(self.breakpoints):
|
|
print(f" 0x{bp:08x}")
|
|
|
|
def disassemble_around(self, ip: int, lines_before: int = 2, lines_after: int = 5):
|
|
"""Disassemble code around current IP"""
|
|
dis = Disassembler(self.vm.bytecode)
|
|
|
|
# Find current function
|
|
current_func = None
|
|
for func in dis.functions:
|
|
if func['code_offset'] <= ip < func['code_offset'] + func['code_size']:
|
|
current_func = func
|
|
break
|
|
|
|
if not current_func:
|
|
print("Not in any function")
|
|
return
|
|
|
|
# Disassemble the function and find current instruction
|
|
func_code = dis.disassemble_function(current_func)
|
|
|
|
# Find current instruction in disassembly
|
|
current_line = -1
|
|
for i, line in enumerate(func_code):
|
|
if f"0x{ip:08x}:" in line:
|
|
current_line = i
|
|
break
|
|
|
|
if current_line == -1:
|
|
print("Could not find current instruction")
|
|
return
|
|
|
|
# Print surrounding lines
|
|
start = max(0, current_line - lines_before)
|
|
end = min(len(func_code), current_line + lines_after + 1)
|
|
|
|
print(f"Disassembly around 0x{ip:08x}:")
|
|
for i in range(start, end):
|
|
marker = ">>> " if i == current_line else " "
|
|
print(f"{marker}{func_code[i]}")
|
|
|
|
def handle_command(self, command: str) -> bool:
|
|
"""Handle debugger command"""
|
|
cmd_parts = command.strip().split()
|
|
if not cmd_parts:
|
|
return True
|
|
|
|
cmd = cmd_parts[0].lower()
|
|
|
|
if cmd in ['c', 'continue']:
|
|
self.step_mode = False
|
|
return False
|
|
|
|
elif cmd in ['s', 'step']:
|
|
self.step_mode = True
|
|
return False
|
|
|
|
elif cmd in ['n', 'next']:
|
|
# Step over calls
|
|
current_ip = self.vm.ip
|
|
self.step_mode = True
|
|
return False
|
|
|
|
elif cmd in ['si', 'stepi']:
|
|
# Single instruction
|
|
self.execute_single_instruction()
|
|
return True
|
|
|
|
elif cmd in ['b', 'break']:
|
|
if len(cmd_parts) > 1:
|
|
try:
|
|
if cmd_parts[1].startswith('0x'):
|
|
bp = int(cmd_parts[1], 16)
|
|
else:
|
|
bp = int(cmd_parts[1])
|
|
self.breakpoints.add(bp)
|
|
print(f"Breakpoint set at 0x{bp:08x}")
|
|
except ValueError:
|
|
print("Invalid breakpoint address")
|
|
else:
|
|
print("Usage: break <address>")
|
|
|
|
elif cmd in ['db', 'delbreak']:
|
|
if len(cmd_parts) > 1:
|
|
try:
|
|
if cmd_parts[1].startswith('0x'):
|
|
bp = int(cmd_parts[1], 16)
|
|
else:
|
|
bp = int(cmd_parts[1])
|
|
if bp in self.breakpoints:
|
|
self.breakpoints.remove(bp)
|
|
print(f"Breakpoint removed at 0x{bp:08x}")
|
|
else:
|
|
print("Breakpoint not found")
|
|
except ValueError:
|
|
print("Invalid breakpoint address")
|
|
else:
|
|
self.breakpoints.clear()
|
|
print("All breakpoints cleared")
|
|
|
|
elif cmd in ['bl', 'breaklist']:
|
|
self.print_breakpoints()
|
|
|
|
elif cmd in ['st', 'stack']:
|
|
self.print_stack()
|
|
|
|
elif cmd in ['l', 'locals']:
|
|
self.print_locals()
|
|
|
|
elif cmd in ['d', 'disasm']:
|
|
lines = 10
|
|
if len(cmd_parts) > 1:
|
|
try:
|
|
lines = int(cmd_parts[1])
|
|
except ValueError:
|
|
pass
|
|
self.disassemble_around(self.vm.ip, lines_after=lines)
|
|
|
|
elif cmd in ['p', 'print']:
|
|
if len(cmd_parts) > 1:
|
|
# Try to evaluate expression (simple for now)
|
|
expr = ' '.join(cmd_parts[1:])
|
|
if expr == 'stack':
|
|
self.print_stack()
|
|
elif expr == 'locals':
|
|
self.print_locals()
|
|
elif expr == 'ip':
|
|
print(f"IP: 0x{self.vm.ip:08x}")
|
|
else:
|
|
print(f"Unknown expression: {expr}")
|
|
else:
|
|
self.print_status()
|
|
|
|
elif cmd in ['h', 'help', '?']:
|
|
self.print_help()
|
|
|
|
elif cmd in ['q', 'quit']:
|
|
print("Debugger exited")
|
|
sys.exit(0)
|
|
|
|
else:
|
|
print(f"Unknown command: {cmd}")
|
|
self.print_help()
|
|
|
|
return True
|
|
|
|
def execute_single_instruction(self):
|
|
"""Execute a single instruction"""
|
|
if self.vm.halted:
|
|
print("VM is halted")
|
|
return
|
|
|
|
old_ip = self.vm.ip
|
|
try:
|
|
self.vm.execute_instruction()
|
|
print(f"Executed: 0x{old_ip:08x} -> 0x{self.vm.ip:08x}")
|
|
self.disassemble_around(self.vm.ip, lines_before=0, lines_after=1)
|
|
except Exception as e:
|
|
print(f"Error executing instruction at 0x{old_ip:08x}: {e}")
|
|
|
|
def print_help(self):
|
|
"""Print debugger help"""
|
|
print("Debugger commands:")
|
|
print(" c, continue - Continue execution")
|
|
print(" s, step - Step into functions")
|
|
print(" n, next - Step over functions")
|
|
print(" si, stepi - Single instruction step")
|
|
print(" b, break <addr>- Set breakpoint")
|
|
print(" db, delbreak - Delete breakpoint (or all)")
|
|
print(" bl, breaklist - List breakpoints")
|
|
print(" st, stack - Show stack")
|
|
print(" l, locals - Show local variables")
|
|
print(" d, disasm [n] - Disassemble around IP")
|
|
print(" p, print [expr]- Print status or expression")
|
|
print(" h, help, ? - This help")
|
|
print(" q, quit - Quit debugger")
|
|
|
|
def run(self):
|
|
"""Run debugger main loop"""
|
|
print("POP VM Debugger started")
|
|
print("Type 'help' for commands")
|
|
|
|
while not self.vm.halted:
|
|
# Check breakpoints
|
|
if self.vm.ip in self.breakpoints:
|
|
print(f"\nBreakpoint hit at 0x{self.vm.ip:08x}")
|
|
self.step_mode = True
|
|
|
|
if self.step_mode:
|
|
self.print_status()
|
|
try:
|
|
command = input("\ndbg> ")
|
|
if self.handle_command(command):
|
|
continue # Stay in step mode
|
|
else:
|
|
# Command said to continue
|
|
pass
|
|
except EOFError:
|
|
print("\nExiting debugger")
|
|
break
|
|
except KeyboardInterrupt:
|
|
print("\nInterrupted")
|
|
self.step_mode = True
|
|
continue
|
|
|
|
# Execute instruction
|
|
try:
|
|
self.vm.execute_instruction()
|
|
except Exception as e:
|
|
print(f"Runtime error at IP 0x{self.vm.ip:08x}: {e}")
|
|
break
|
|
|
|
# ============================================================================
|
|
# VM IMPLEMENTATION
|
|
# ============================================================================
|
|
|
|
class Frame:
|
|
"""Call frame with type safety"""
|
|
def __init__(self, func_idx: int, return_ip: int, arg_values: List[Value]):
|
|
self.func_idx = func_idx
|
|
self.return_ip = return_ip
|
|
self.locals = arg_values[:] # Copy arguments
|
|
self.stack: List[Value] = []
|
|
|
|
def __repr__(self):
|
|
return f"Frame(func={self.func_idx}, return_ip=0x{self.return_ip:08x}, locals={len(self.locals)}, stack={len(self.stack)})"
|
|
|
|
class VM:
|
|
def __init__(self, bytecode: bytes, filename: str = ""):
|
|
self.bytecode = bytecode
|
|
self.filename = filename
|
|
self.ip = 0
|
|
self.constants: List[Value] = []
|
|
self.functions: List[Dict] = []
|
|
self.call_stack: List[Frame] = []
|
|
self.current_frame: Optional[Frame] = None
|
|
self.halted = False
|
|
|
|
self.load_bytecode()
|
|
|
|
def error(self, message: str):
|
|
"""Raise a VM error with position information"""
|
|
raise RuntimeError(f"{self.filename}:0x{self.ip:08x}: {message}")
|
|
|
|
def load_bytecode(self):
|
|
"""Load bytecode into VM with validation"""
|
|
self.ip = 0
|
|
|
|
# Check magic
|
|
magic = self.bytecode[self.ip:self.ip+4]
|
|
if magic != b'POPC':
|
|
self.error(f"Invalid .popclass file magic: {magic}")
|
|
self.ip += 4
|
|
|
|
# Version
|
|
version_major = struct.unpack('<H', self.bytecode[self.ip:self.ip+2])[0]
|
|
self.ip += 2
|
|
version_minor = struct.unpack('<H', self.bytecode[self.ip:self.ip+2])[0]
|
|
self.ip += 2
|
|
|
|
# Reserved
|
|
self.ip += 4
|
|
|
|
# Load constants
|
|
const_count = struct.unpack('<I', self.bytecode[self.ip:self.ip+4])[0]
|
|
self.ip += 4
|
|
|
|
for i in range(const_count):
|
|
const_type = self.bytecode[self.ip]
|
|
self.ip += 1
|
|
|
|
if const_type == 0: # int
|
|
width = self.bytecode[self.ip]
|
|
self.ip += 1
|
|
signed = self.bytecode[self.ip]
|
|
self.ip += 1
|
|
value = struct.unpack('<i', self.bytecode[self.ip:self.ip+4])[0]
|
|
self.ip += 4
|
|
|
|
type_code = TypeCode.I32 if signed else TypeCode.U32
|
|
self.constants.append(Value(type_code, value))
|
|
|
|
elif const_type == 1: # float
|
|
value = struct.unpack('<f', self.bytecode[self.ip:self.ip+4])[0]
|
|
self.ip += 4
|
|
self.constants.append(Value(TypeCode.F32, value))
|
|
|
|
elif const_type == 2: # str
|
|
length = struct.unpack('<I', self.bytecode[self.ip:self.ip+4])[0]
|
|
self.ip += 4
|
|
str_bytes = self.bytecode[self.ip:self.ip+length]
|
|
self.ip += length
|
|
self.constants.append(Value(TypeCode.STR, str_bytes.decode('utf-8')))
|
|
else:
|
|
self.error(f"Unknown constant type: 0x{const_type:02x}")
|
|
|
|
# Load functions
|
|
func_count = struct.unpack('<I', self.bytecode[self.ip:self.ip+4])[0]
|
|
self.ip += 4
|
|
|
|
for i in range(func_count):
|
|
name_idx = struct.unpack('<I', self.bytecode[self.ip:self.ip+4])[0]
|
|
self.ip += 4
|
|
arg_count = self.bytecode[self.ip]
|
|
self.ip += 1
|
|
local_count = self.bytecode[self.ip]
|
|
self.ip += 1
|
|
self.ip += 2 # Reserved
|
|
code_size = struct.unpack('<I', self.bytecode[self.ip:self.ip+4])[0]
|
|
self.ip += 4
|
|
code_offset = struct.unpack('<I', self.bytecode[self.ip:self.ip+4])[0]
|
|
self.ip += 4
|
|
|
|
self.functions.append({
|
|
'index': i,
|
|
'arg_count': arg_count,
|
|
'local_count': local_count,
|
|
'code_offset': code_offset,
|
|
'code_size': code_size,
|
|
})
|
|
|
|
def fetch_byte(self) -> int:
|
|
"""Fetch one byte and advance IP"""
|
|
if self.ip >= len(self.bytecode):
|
|
self.error("Unexpected end of bytecode")
|
|
b = self.bytecode[self.ip]
|
|
self.ip += 1
|
|
return b
|
|
|
|
def fetch_u16(self) -> int:
|
|
"""Fetch u16 and advance IP"""
|
|
if self.ip + 2 > len(self.bytecode):
|
|
self.error("Unexpected end of bytecode while reading u16")
|
|
value = struct.unpack('<H', self.bytecode[self.ip:self.ip+2])[0]
|
|
self.ip += 2
|
|
return value
|
|
|
|
def fetch_u32(self) -> int:
|
|
"""Fetch u32 and advance IP"""
|
|
if self.ip + 4 > len(self.bytecode):
|
|
self.error("Unexpected end of bytecode while reading u32")
|
|
value = struct.unpack('<I', self.bytecode[self.ip:self.ip+4])[0]
|
|
self.ip += 4
|
|
return value
|
|
|
|
def fetch_i32(self) -> int:
|
|
"""Fetch i32 and advance IP"""
|
|
if self.ip + 4 > len(self.bytecode):
|
|
self.error("Unexpected end of bytecode while reading i32")
|
|
value = struct.unpack('<i', self.bytecode[self.ip:self.ip+4])[0]
|
|
self.ip += 4
|
|
return value
|
|
|
|
def fetch_f32(self) -> float:
|
|
"""Fetch f32 and advance IP"""
|
|
if self.ip + 4 > len(self.bytecode):
|
|
self.error("Unexpected end of bytecode while reading f32")
|
|
value = struct.unpack('<f', self.bytecode[self.ip:self.ip+4])[0]
|
|
self.ip += 4
|
|
return value
|
|
|
|
def push(self, value: Value):
|
|
"""Push value onto current frame's stack"""
|
|
self.current_frame.stack.append(value)
|
|
|
|
def pop(self) -> Value:
|
|
"""Pop value from current frame's stack"""
|
|
if not self.current_frame.stack:
|
|
self.error("Stack underflow")
|
|
return self.current_frame.stack.pop()
|
|
|
|
def type_check_binary_op(self, a: Value, b: Value, op: str) -> Tuple[TypeCode, TypeCode]:
|
|
"""Check if types are compatible for binary operation"""
|
|
# Allow numeric types to mix with some restrictions
|
|
numeric_types = {TypeCode.I8, TypeCode.U8, TypeCode.I16, TypeCode.U16, TypeCode.I32, TypeCode.U32, TypeCode.F32}
|
|
|
|
if a.type_code in numeric_types and b.type_code in numeric_types:
|
|
# For mixed operations, promote to the wider type
|
|
if a.type_code == TypeCode.F32 or b.type_code == TypeCode.F32:
|
|
return (TypeCode.F32, TypeCode.F32)
|
|
else:
|
|
# Both are integers, use I32 as common type
|
|
return (TypeCode.I32, TypeCode.I32)
|
|
|
|
# For boolean operations
|
|
if op in ['==', '!=', '<', '>', '<=', '>=']:
|
|
if a.type_code == b.type_code:
|
|
return (a.type_code, b.type_code)
|
|
|
|
self.error(f"Type mismatch in {op}: {a.get_type_name()} and {b.get_type_name()}")
|
|
|
|
def run(self, entry_func: int = 0, debug: bool = False):
|
|
"""Run VM starting from entry function"""
|
|
if entry_func >= len(self.functions):
|
|
self.error(f"Invalid entry function index: {entry_func}")
|
|
|
|
# Set up initial frame
|
|
func = self.functions[entry_func]
|
|
self.current_frame = Frame(entry_func, -1, [])
|
|
|
|
# Initialize locals
|
|
for _ in range(func['local_count']):
|
|
self.current_frame.locals.append(Value(TypeCode.I32, 0))
|
|
|
|
self.ip = func['code_offset']
|
|
self.call_stack = [self.current_frame]
|
|
|
|
if debug:
|
|
debugger = Debugger(self)
|
|
debugger.run()
|
|
else:
|
|
while not self.halted and self.ip < len(self.bytecode):
|
|
self.execute_instruction()
|
|
|
|
def execute_instruction(self):
|
|
"""Execute one instruction with type safety"""
|
|
opcode = self.fetch_byte()
|
|
|
|
if opcode == Opcode.PUSH_CONST:
|
|
idx = self.fetch_u32()
|
|
if idx >= len(self.constants):
|
|
self.error(f"Constant index {idx} out of range")
|
|
self.push(self.constants[idx])
|
|
|
|
elif opcode == Opcode.PUSH_INT:
|
|
width = self.fetch_byte()
|
|
value = self.fetch_i32()
|
|
|
|
if width == 8:
|
|
type_code = TypeCode.I8
|
|
elif width == 16:
|
|
type_code = TypeCode.I16
|
|
else:
|
|
type_code = TypeCode.I32
|
|
|
|
self.push(Value(type_code, value))
|
|
|
|
elif opcode == Opcode.PUSH_FLOAT:
|
|
value = self.fetch_f32()
|
|
self.push(Value(TypeCode.F32, value))
|
|
|
|
elif opcode == Opcode.PUSH_STR:
|
|
idx = self.fetch_u32()
|
|
if idx >= len(self.constants):
|
|
self.error(f"Constant index {idx} out of range")
|
|
self.push(self.constants[idx])
|
|
|
|
elif opcode == Opcode.LOAD_LOCAL:
|
|
idx = self.fetch_u16()
|
|
if idx >= len(self.current_frame.locals):
|
|
self.error(f"Local variable index {idx} out of range")
|
|
self.push(self.current_frame.locals[idx])
|
|
|
|
elif opcode == Opcode.STORE_LOCAL:
|
|
idx = self.fetch_u16()
|
|
value = self.pop()
|
|
if idx >= len(self.current_frame.locals):
|
|
# Extend locals if needed
|
|
self.current_frame.locals.extend([Value(TypeCode.I32, 0)] * (idx - len(self.current_frame.locals) + 1))
|
|
self.current_frame.locals[idx] = value
|
|
|
|
elif opcode == Opcode.ADD:
|
|
a = self.pop()
|
|
b = self.pop()
|
|
a_type, b_type = self.type_check_binary_op(a, b, '+')
|
|
|
|
if a_type == TypeCode.F32:
|
|
result = b.to_float() + a.to_float()
|
|
self.push(Value(TypeCode.F32, result))
|
|
else:
|
|
result = b.to_int() + a.to_int()
|
|
self.push(Value(TypeCode.I32, result))
|
|
|
|
elif opcode == Opcode.SUB:
|
|
a = self.pop()
|
|
b = self.pop()
|
|
a_type, b_type = self.type_check_binary_op(a, b, '-')
|
|
|
|
if a_type == TypeCode.F32:
|
|
result = b.to_float() - a.to_float()
|
|
self.push(Value(TypeCode.F32, result))
|
|
else:
|
|
result = b.to_int() - a.to_int()
|
|
self.push(Value(TypeCode.I32, result))
|
|
|
|
elif opcode == Opcode.MUL:
|
|
a = self.pop()
|
|
b = self.pop()
|
|
a_type, b_type = self.type_check_binary_op(a, b, '*')
|
|
|
|
if a_type == TypeCode.F32:
|
|
result = b.to_float() * a.to_float()
|
|
self.push(Value(TypeCode.F32, result))
|
|
else:
|
|
result = b.to_int() * a.to_int()
|
|
self.push(Value(TypeCode.I32, result))
|
|
|
|
elif opcode == Opcode.DIV:
|
|
a = self.pop()
|
|
b = self.pop()
|
|
a_type, b_type = self.type_check_binary_op(a, b, '/')
|
|
|
|
if a.to_int() == 0 or a.to_float() == 0:
|
|
self.error("Division by zero")
|
|
|
|
if a_type == TypeCode.F32:
|
|
result = b.to_float() / a.to_float()
|
|
self.push(Value(TypeCode.F32, result))
|
|
else:
|
|
result = b.to_int() // a.to_int()
|
|
self.push(Value(TypeCode.I32, result))
|
|
|
|
elif opcode == Opcode.MOD:
|
|
a = self.pop()
|
|
b = self.pop()
|
|
a_type, b_type = self.type_check_binary_op(a, b, '%')
|
|
|
|
if a.to_int() == 0:
|
|
self.error("Modulo by zero")
|
|
|
|
result = b.to_int() % a.to_int()
|
|
self.push(Value(TypeCode.I32, result))
|
|
|
|
elif opcode == Opcode.NEG:
|
|
a = self.pop()
|
|
if a.type_code == TypeCode.F32:
|
|
result = -a.to_float()
|
|
self.push(Value(TypeCode.F32, result))
|
|
else:
|
|
result = -a.to_int()
|
|
self.push(Value(TypeCode.I32, result))
|
|
|
|
elif opcode == Opcode.BIT_AND:
|
|
a = self.pop()
|
|
b = self.pop()
|
|
result = b.to_int() & a.to_int()
|
|
self.push(Value(TypeCode.I32, result))
|
|
|
|
elif opcode == Opcode.BIT_OR:
|
|
a = self.pop()
|
|
b = self.pop()
|
|
result = b.to_int() | a.to_int()
|
|
self.push(Value(TypeCode.I32, result))
|
|
|
|
elif opcode == Opcode.BIT_XOR:
|
|
a = self.pop()
|
|
b = self.pop()
|
|
result = b.to_int() ^ a.to_int()
|
|
self.push(Value(TypeCode.I32, result))
|
|
|
|
elif opcode == Opcode.FADD:
|
|
a = self.pop()
|
|
b = self.pop()
|
|
result = b.to_float() + a.to_float()
|
|
self.push(Value(TypeCode.F32, result))
|
|
|
|
elif opcode == Opcode.FSUB:
|
|
a = self.pop()
|
|
b = self.pop()
|
|
result = b.to_float() - a.to_float()
|
|
self.push(Value(TypeCode.F32, result))
|
|
|
|
elif opcode == Opcode.FMUL:
|
|
a = self.pop()
|
|
b = self.pop()
|
|
result = b.to_float() * a.to_float()
|
|
self.push(Value(TypeCode.F32, result))
|
|
|
|
elif opcode == Opcode.FDIV:
|
|
a = self.pop()
|
|
b = self.pop()
|
|
if a.to_float() == 0:
|
|
self.error("Division by zero")
|
|
result = b.to_float() / a.to_float()
|
|
self.push(Value(TypeCode.F32, result))
|
|
|
|
elif opcode == Opcode.FNEG:
|
|
a = self.pop()
|
|
result = -a.to_float()
|
|
self.push(Value(TypeCode.F32, result))
|
|
|
|
elif opcode == Opcode.CMP_EQ:
|
|
a = self.pop()
|
|
b = self.pop()
|
|
result = b.data == a.data
|
|
self.push(Value(TypeCode.BOOL, result))
|
|
|
|
elif opcode == Opcode.CMP_NEQ:
|
|
a = self.pop()
|
|
b = self.pop()
|
|
result = b.data != a.data
|
|
self.push(Value(TypeCode.BOOL, result))
|
|
|
|
elif opcode == Opcode.CMP_LT:
|
|
a = self.pop()
|
|
b = self.pop()
|
|
a_type, b_type = self.type_check_binary_op(a, b, '<')
|
|
|
|
if a_type == TypeCode.F32:
|
|
result = b.to_float() < a.to_float()
|
|
else:
|
|
result = b.to_int() < a.to_int()
|
|
self.push(Value(TypeCode.BOOL, result))
|
|
|
|
elif opcode == Opcode.CMP_GT:
|
|
a = self.pop()
|
|
b = self.pop()
|
|
a_type, b_type = self.type_check_binary_op(a, b, '>')
|
|
|
|
if a_type == TypeCode.F32:
|
|
result = b.to_float() > a.to_float()
|
|
else:
|
|
result = b.to_int() > a.to_int()
|
|
self.push(Value(TypeCode.BOOL, result))
|
|
|
|
elif opcode == Opcode.CMP_LE:
|
|
a = self.pop()
|
|
b = self.pop()
|
|
a_type, b_type = self.type_check_binary_op(a, b, '<=')
|
|
|
|
if a_type == TypeCode.F32:
|
|
result = b.to_float() <= a.to_float()
|
|
else:
|
|
result = b.to_int() <= a.to_int()
|
|
self.push(Value(TypeCode.BOOL, result))
|
|
|
|
elif opcode == Opcode.CMP_GE:
|
|
a = self.pop()
|
|
b = self.pop()
|
|
a_type, b_type = self.type_check_binary_op(a, b, '>=')
|
|
|
|
if a_type == TypeCode.F32:
|
|
result = b.to_float() >= a.to_float()
|
|
else:
|
|
result = b.to_int() >= a.to_int()
|
|
self.push(Value(TypeCode.BOOL, result))
|
|
|
|
elif opcode == Opcode.JMP:
|
|
offset = self.fetch_i32()
|
|
self.ip += offset
|
|
|
|
elif opcode == Opcode.JMP_IF:
|
|
offset = self.fetch_i32()
|
|
cond = self.pop()
|
|
if cond.to_bool():
|
|
self.ip += offset
|
|
|
|
elif opcode == Opcode.JMP_IF_NOT:
|
|
offset = self.fetch_i32()
|
|
cond = self.pop()
|
|
if not cond.to_bool():
|
|
self.ip += offset
|
|
|
|
elif opcode == Opcode.CALL:
|
|
func_idx = self.fetch_u16()
|
|
arg_count = self.fetch_byte()
|
|
|
|
if func_idx >= len(self.functions):
|
|
self.error(f"Function index {func_idx} out of range")
|
|
|
|
# Pop arguments
|
|
args = []
|
|
for _ in range(arg_count):
|
|
args.insert(0, self.pop())
|
|
|
|
# Validate argument count
|
|
func = self.functions[func_idx]
|
|
if len(args) != func['arg_count']:
|
|
self.error(f"Function expects {func['arg_count']} arguments, got {len(args)}")
|
|
|
|
# Save return address
|
|
return_ip = self.ip
|
|
|
|
# Create new frame
|
|
new_frame = Frame(func_idx, return_ip, args)
|
|
|
|
# Initialize remaining locals
|
|
for _ in range(func['local_count'] - arg_count):
|
|
new_frame.locals.append(Value(TypeCode.I32, 0))
|
|
|
|
self.call_stack.append(new_frame)
|
|
self.current_frame = new_frame
|
|
self.ip = func['code_offset']
|
|
|
|
elif opcode == Opcode.RET:
|
|
has_value = self.fetch_byte()
|
|
|
|
return_value = None
|
|
if has_value:
|
|
return_value = self.pop()
|
|
|
|
# Pop frame
|
|
self.call_stack.pop()
|
|
|
|
if not self.call_stack:
|
|
# Returned from main
|
|
self.halted = True
|
|
if return_value:
|
|
print(f"Program returned: {return_value.data}")
|
|
return
|
|
|
|
# Restore previous frame
|
|
self.current_frame = self.call_stack[-1]
|
|
self.ip = self.current_frame.return_ip
|
|
|
|
# Push return value if any
|
|
if return_value:
|
|
self.push(return_value)
|
|
|
|
elif opcode == Opcode.DUP:
|
|
if not self.current_frame.stack:
|
|
self.error("Cannot DUP from empty stack")
|
|
value = self.current_frame.stack[-1]
|
|
self.push(Value(value.type_code, value.data))
|
|
|
|
elif opcode == Opcode.POP:
|
|
self.pop()
|
|
|
|
elif opcode == Opcode.PRINT:
|
|
value = self.pop()
|
|
print(value.data)
|
|
|
|
elif opcode == Opcode.HALT:
|
|
self.halted = True
|
|
|
|
else:
|
|
self.error(f"Unknown opcode: 0x{opcode:02X}")
|
|
|
|
# ============================================================================
|
|
# COMPILER WITH PROPER FOR LOOP SUPPORT
|
|
# ============================================================================
|
|
|
|
class Compiler:
|
|
def __init__(self, source: str, filename: str = ""):
|
|
self.lexer = Lexer(source, filename)
|
|
self.filename = filename
|
|
self.current_token = self.lexer.next_token()
|
|
self.constants: List[Tuple[TypeCode, Any]] = []
|
|
self.functions: List[Dict] = []
|
|
self.current_function: Optional[Dict] = None
|
|
self.local_vars: Dict[str, int] = {}
|
|
self.local_count = 0
|
|
self.loop_stack: List[Dict] = [] # For break/continue in loops
|
|
|
|
def error(self, msg: str):
|
|
"""Raise a compilation error with position information"""
|
|
raise SyntaxError(f"{self.filename}:{self.current_token.line}:{self.current_token.col}: {msg}")
|
|
|
|
def expect(self, token_type: TokenType, context: str = ""):
|
|
"""Expect a specific token type, provide context for better error messages"""
|
|
if self.current_token.type != token_type:
|
|
context_str = f" in {context}" if context else ""
|
|
self.error(f"Expected {token_type.name}{context_str}, but got {self.current_token.type.name}")
|
|
self.current_token = self.lexer.next_token()
|
|
|
|
def eat(self, token_type: TokenType):
|
|
"""Compatibility alias for expect"""
|
|
self.expect(token_type)
|
|
|
|
def add_constant(self, type_code: TypeCode, value: Any) -> int:
|
|
"""Add constant to pool and return index"""
|
|
for i, (tc, v) in enumerate(self.constants):
|
|
if tc == type_code and v == value:
|
|
return i
|
|
self.constants.append((type_code, value))
|
|
return len(self.constants) - 1
|
|
|
|
def emit(self, *bytes_data):
|
|
"""Emit bytes to current function's code"""
|
|
for b in bytes_data:
|
|
if isinstance(b, int):
|
|
self.current_function['code'].append(b)
|
|
elif isinstance(b, bytes):
|
|
self.current_function['code'].extend(b)
|
|
|
|
def compile(self) -> bytes:
|
|
"""Main compilation entry point"""
|
|
try:
|
|
while self.current_token.type != TokenType.EOF:
|
|
if self.current_token.type == TokenType.FUN:
|
|
self.compile_function()
|
|
else:
|
|
self.error("Expected function definition")
|
|
|
|
return self.generate_bytecode()
|
|
except Exception as e:
|
|
# Add context to compilation errors
|
|
if not isinstance(e, SyntaxError):
|
|
self.error(str(e))
|
|
else:
|
|
raise
|
|
|
|
def compile_function(self):
|
|
"""Compile function definition"""
|
|
self.expect(TokenType.FUN, "function definition")
|
|
|
|
func_name = self.current_token.value
|
|
self.expect(TokenType.IDENTIFIER, "function name")
|
|
|
|
self.expect(TokenType.LPAREN, "function parameter list")
|
|
|
|
# Parse parameters
|
|
params = []
|
|
while self.current_token.type != TokenType.RPAREN:
|
|
param_type = self.parse_type()
|
|
param_name = self.current_token.value
|
|
self.expect(TokenType.IDENTIFIER, "parameter name")
|
|
params.append((param_type, param_name))
|
|
|
|
if self.current_token.type == TokenType.COMMA:
|
|
self.eat(TokenType.COMMA)
|
|
|
|
self.expect(TokenType.RPAREN, "function parameter list")
|
|
|
|
# Set up function
|
|
self.current_function = {
|
|
'name': func_name,
|
|
'arg_count': len(params),
|
|
'code': [],
|
|
'labels': {}, # For break/continue labels
|
|
}
|
|
|
|
# Set up locals
|
|
self.local_vars = {}
|
|
self.local_count = 0
|
|
|
|
# Parameters become first locals
|
|
for _, param_name in params:
|
|
self.local_vars[param_name] = self.local_count
|
|
self.local_count += 1
|
|
|
|
# Parse body
|
|
self.expect(TokenType.LBRACE, "function body")
|
|
while self.current_token.type != TokenType.RBRACE:
|
|
self.compile_statement()
|
|
self.expect(TokenType.RBRACE, "function body")
|
|
|
|
# Add implicit return if not present
|
|
if not self.current_function['code'] or self.current_function['code'][-1] != Opcode.RET:
|
|
self.emit(Opcode.RET, 0)
|
|
|
|
self.current_function['local_count'] = self.local_count
|
|
self.functions.append(self.current_function)
|
|
self.current_function = None
|
|
|
|
def parse_type(self) -> TypeCode:
|
|
"""Parse type specification with better error reporting"""
|
|
type_map = {
|
|
TokenType.U8: TypeCode.U8,
|
|
TokenType.U16: TypeCode.U16,
|
|
TokenType.U32: TypeCode.U32,
|
|
TokenType.I8: TypeCode.I8,
|
|
TokenType.I16: TypeCode.I16,
|
|
TokenType.I32: TypeCode.I32,
|
|
TokenType.FLOAT: TypeCode.F32,
|
|
TokenType.BOOL: TypeCode.BOOL,
|
|
TokenType.CHAR: TypeCode.CHAR,
|
|
TokenType.STR: TypeCode.STR,
|
|
TokenType.UINT: TypeCode.U32,
|
|
TokenType.INT: TypeCode.I32,
|
|
}
|
|
|
|
if self.current_token.type in type_map:
|
|
type_code = type_map[self.current_token.type]
|
|
self.current_token = self.lexer.next_token()
|
|
return type_code
|
|
else:
|
|
self.error(f"Expected type specification, got {self.current_token.type.name}")
|
|
|
|
def compile_statement(self):
|
|
"""Compile a statement with better error context"""
|
|
try:
|
|
if self.current_token.type in [TokenType.U8, TokenType.U16, TokenType.U32,
|
|
TokenType.I8, TokenType.I16, TokenType.I32,
|
|
TokenType.FLOAT, TokenType.BOOL, TokenType.CHAR,
|
|
TokenType.STR, TokenType.UINT, TokenType.INT]:
|
|
self.compile_var_declaration()
|
|
elif self.current_token.type == TokenType.IF:
|
|
self.compile_if_statement()
|
|
elif self.current_token.type == TokenType.WHILE:
|
|
self.compile_while_statement()
|
|
elif self.current_token.type == TokenType.FOR:
|
|
self.compile_for_statement()
|
|
elif self.current_token.type == TokenType.RETURN:
|
|
self.compile_return_statement()
|
|
elif self.current_token.type == TokenType.LBRACE:
|
|
self.compile_block()
|
|
elif self.current_token.type == TokenType.IDENTIFIER:
|
|
self.compile_assignment_or_call()
|
|
else:
|
|
self.error(f"Unexpected statement starting with {self.current_token.type.name}")
|
|
except Exception as e:
|
|
# Add context to statement compilation errors
|
|
if not isinstance(e, SyntaxError):
|
|
self.error(f"Error in statement: {e}")
|
|
else:
|
|
raise
|
|
|
|
def compile_block(self):
|
|
"""Compile a block of statements"""
|
|
self.expect(TokenType.LBRACE, "block start")
|
|
|
|
# Save current locals to restore after block
|
|
old_locals = self.local_vars.copy()
|
|
old_local_count = self.local_count
|
|
|
|
while self.current_token.type != TokenType.RBRACE:
|
|
self.compile_statement()
|
|
|
|
self.expect(TokenType.RBRACE, "block end")
|
|
|
|
# Restore locals (block scoping)
|
|
self.local_vars = old_locals
|
|
self.local_count = old_local_count
|
|
|
|
def compile_var_declaration(self):
|
|
"""Compile variable declaration with type checking"""
|
|
var_type = self.parse_type()
|
|
var_name = self.current_token.value
|
|
self.expect(TokenType.IDENTIFIER, "variable name")
|
|
|
|
# Check for redeclaration
|
|
if var_name in self.local_vars:
|
|
self.error(f"Redeclaration of variable '{var_name}'")
|
|
|
|
# Add to locals
|
|
self.local_vars[var_name] = self.local_count
|
|
self.local_count += 1
|
|
|
|
local_idx = self.local_vars[var_name]
|
|
|
|
if self.current_token.type == TokenType.ASSIGN:
|
|
self.eat(TokenType.ASSIGN)
|
|
self.compile_expression()
|
|
# TODO: Add type checking for assignment
|
|
self.emit(Opcode.STORE_LOCAL, *struct.pack('<H', local_idx))
|
|
|
|
self.expect(TokenType.SEMICOLON, "variable declaration")
|
|
|
|
def compile_assignment_or_call(self):
|
|
"""Compile assignment or function call"""
|
|
name = self.current_token.value
|
|
self.expect(TokenType.IDENTIFIER, "identifier")
|
|
|
|
if self.current_token.type == TokenType.LPAREN:
|
|
# Function call
|
|
is_builtin = (name == 'print')
|
|
self.compile_call(name)
|
|
# Only pop return value if it's not a builtin that doesn't return
|
|
if not is_builtin:
|
|
self.emit(Opcode.POP) # Discard return value
|
|
self.expect(TokenType.SEMICOLON, "function call")
|
|
elif self.current_token.type in [TokenType.ASSIGN, TokenType.PLUS_ASSIGN,
|
|
TokenType.MINUS_ASSIGN, TokenType.STAR_ASSIGN,
|
|
TokenType.SLASH_ASSIGN, TokenType.INCREMENT,
|
|
TokenType.DECREMENT]:
|
|
# Assignment or increment/decrement
|
|
if name not in self.local_vars:
|
|
self.error(f"Undefined variable '{name}'")
|
|
|
|
local_idx = self.local_vars[name]
|
|
|
|
if self.current_token.type in [TokenType.INCREMENT, TokenType.DECREMENT]:
|
|
# Handle ++ and -- operators
|
|
op = self.current_token.type
|
|
self.eat(op)
|
|
|
|
# Load variable
|
|
self.emit(Opcode.LOAD_LOCAL, *struct.pack('<H', local_idx))
|
|
|
|
# Push 1
|
|
self.emit(Opcode.PUSH_INT, 32, *struct.pack('<i', 1))
|
|
|
|
# Add or subtract
|
|
if op == TokenType.INCREMENT:
|
|
self.emit(Opcode.ADD)
|
|
else:
|
|
self.emit(Opcode.SUB)
|
|
|
|
# Store back
|
|
self.emit(Opcode.STORE_LOCAL, *struct.pack('<H', local_idx))
|
|
|
|
self.expect(TokenType.SEMICOLON, "increment/decrement statement")
|
|
else:
|
|
# Regular or compound assignment
|
|
op = self.current_token.type
|
|
self.eat(op)
|
|
|
|
if op == TokenType.ASSIGN:
|
|
self.compile_expression()
|
|
else:
|
|
# Compound assignment
|
|
self.emit(Opcode.LOAD_LOCAL, *struct.pack('<H', local_idx))
|
|
self.compile_expression()
|
|
|
|
if op == TokenType.PLUS_ASSIGN:
|
|
self.emit(Opcode.ADD)
|
|
elif op == TokenType.MINUS_ASSIGN:
|
|
self.emit(Opcode.SUB)
|
|
elif op == TokenType.STAR_ASSIGN:
|
|
self.emit(Opcode.MUL)
|
|
elif op == TokenType.SLASH_ASSIGN:
|
|
self.emit(Opcode.DIV)
|
|
|
|
self.emit(Opcode.STORE_LOCAL, *struct.pack('<H', local_idx))
|
|
self.expect(TokenType.SEMICOLON, "assignment")
|
|
else:
|
|
self.error(f"Unexpected token after identifier: {self.current_token.type.name}")
|
|
|
|
def compile_if_statement(self):
|
|
"""Compile if statement"""
|
|
self.expect(TokenType.IF, "if statement")
|
|
self.expect(TokenType.LPAREN, "if condition")
|
|
self.compile_expression()
|
|
self.expect(TokenType.RPAREN, "if condition")
|
|
|
|
# Jump if false
|
|
jmp_if_not_pos = len(self.current_function['code'])
|
|
self.emit(Opcode.JMP_IF_NOT, 0, 0, 0, 0) # Placeholder
|
|
|
|
# Then branch
|
|
self.compile_statement()
|
|
|
|
if self.current_token.type == TokenType.ELSE:
|
|
# Jump over else block
|
|
jmp_pos = len(self.current_function['code'])
|
|
self.emit(Opcode.JMP, 0, 0, 0, 0) # Placeholder
|
|
|
|
# Patch JMP_IF_NOT
|
|
else_start = len(self.current_function['code'])
|
|
offset = else_start - (jmp_if_not_pos + 5)
|
|
self.current_function['code'][jmp_if_not_pos + 1:jmp_if_not_pos + 5] = struct.pack('<i', offset)
|
|
|
|
self.eat(TokenType.ELSE)
|
|
self.compile_statement()
|
|
|
|
# Patch JMP
|
|
after_else = len(self.current_function['code'])
|
|
offset = after_else - (jmp_pos + 5)
|
|
self.current_function['code'][jmp_pos + 1:jmp_pos + 5] = struct.pack('<i', offset)
|
|
else:
|
|
# Patch JMP_IF_NOT
|
|
after_if = len(self.current_function['code'])
|
|
offset = after_if - (jmp_if_not_pos + 5)
|
|
self.current_function['code'][jmp_if_not_pos + 1:jmp_if_not_pos + 5] = struct.pack('<i', offset)
|
|
|
|
def compile_while_statement(self):
|
|
"""Compile while loop"""
|
|
self.expect(TokenType.WHILE, "while loop")
|
|
|
|
# Push loop context for break/continue
|
|
loop_start = len(self.current_function['code'])
|
|
self.loop_stack.append({'start': loop_start, 'end_placeholder': -1})
|
|
|
|
self.expect(TokenType.LPAREN, "while condition")
|
|
self.compile_expression()
|
|
self.expect(TokenType.RPAREN, "while condition")
|
|
|
|
# Jump if false to end
|
|
jmp_if_not_pos = len(self.current_function['code'])
|
|
self.emit(Opcode.JMP_IF_NOT, 0, 0, 0, 0) # Placeholder
|
|
|
|
# Loop body
|
|
self.compile_statement()
|
|
|
|
# Jump back to loop start
|
|
current_pos = len(self.current_function['code'])
|
|
offset = loop_start - (current_pos + 5)
|
|
self.emit(Opcode.JMP, *struct.pack('<i', offset))
|
|
|
|
# Patch JMP_IF_NOT
|
|
after_loop = len(self.current_function['code'])
|
|
offset = after_loop - (jmp_if_not_pos + 5)
|
|
self.current_function['code'][jmp_if_not_pos + 1:jmp_if_not_pos + 5] = struct.pack('<i', offset)
|
|
|
|
# Update loop context with actual end position
|
|
self.loop_stack[-1]['end'] = after_loop
|
|
self.loop_stack.pop()
|
|
|
|
def compile_for_statement(self):
|
|
"""Compile C-style for loop with proper structure"""
|
|
self.expect(TokenType.FOR, "for loop")
|
|
self.expect(TokenType.LPAREN, "for loop header")
|
|
|
|
# Push loop context
|
|
self.loop_stack.append({'start_placeholder': -1, 'increment_placeholder': -1, 'end_placeholder': -1})
|
|
|
|
# 1. Initialization (optional)
|
|
if self.current_token.type != TokenType.SEMICOLON:
|
|
if self.current_token.type in [TokenType.U8, TokenType.U16, TokenType.U32,
|
|
TokenType.I8, TokenType.I16, TokenType.I32,
|
|
TokenType.FLOAT, TokenType.BOOL, TokenType.CHAR,
|
|
TokenType.STR, TokenType.UINT, TokenType.INT]:
|
|
# Variable declaration in for loop
|
|
self.compile_var_declaration()
|
|
else:
|
|
# Expression statement
|
|
self.compile_expression()
|
|
self.emit(Opcode.POP) # Discard expression result
|
|
self.expect(TokenType.SEMICOLON, "for loop initialization")
|
|
else:
|
|
self.eat(TokenType.SEMICOLON)
|
|
|
|
# 2. Condition (optional)
|
|
condition_start = len(self.current_function['code'])
|
|
if self.current_token.type != TokenType.SEMICOLON:
|
|
self.compile_expression()
|
|
else:
|
|
# No condition means always true
|
|
self.emit(Opcode.PUSH_INT, 32, *struct.pack('<i', 1))
|
|
self.expect(TokenType.SEMICOLON, "for loop condition")
|
|
|
|
# Jump if false to end (will be patched later)
|
|
jmp_if_not_pos = len(self.current_function['code'])
|
|
self.emit(Opcode.JMP_IF_NOT, 0, 0, 0, 0) # Placeholder for end of loop
|
|
|
|
# 3. Increment (optional) - store position for later jump
|
|
increment_start = len(self.current_function['code'])
|
|
if self.current_token.type != TokenType.RPAREN:
|
|
# Store increment code position
|
|
increment_code_pos = len(self.current_function['code'])
|
|
self.compile_expression()
|
|
# Pop the result of increment expression
|
|
self.emit(Opcode.POP)
|
|
self.expect(TokenType.RPAREN, "for loop header")
|
|
|
|
# Store the increment position in loop context
|
|
self.loop_stack[-1]['increment_placeholder'] = increment_start
|
|
|
|
# 4. Loop body
|
|
body_start = len(self.current_function['code'])
|
|
self.compile_statement()
|
|
|
|
# Jump to increment after body execution
|
|
body_end = len(self.current_function['code'])
|
|
if increment_start > body_start: # Only if there's an increment
|
|
offset = increment_start - (body_end + 5)
|
|
self.emit(Opcode.JMP, *struct.pack('<i', offset))
|
|
|
|
# Patch JMP_IF_NOT to jump after loop
|
|
after_loop = len(self.current_function['code'])
|
|
offset = after_loop - (jmp_if_not_pos + 5)
|
|
self.current_function['code'][jmp_if_not_pos + 1:jmp_if_not_pos + 5] = struct.pack('<i', offset)
|
|
|
|
# Update loop context
|
|
self.loop_stack[-1]['end_placeholder'] = after_loop
|
|
self.loop_stack.pop()
|
|
|
|
def compile_return_statement(self):
|
|
"""Compile return statement"""
|
|
self.expect(TokenType.RETURN, "return statement")
|
|
|
|
if self.current_token.type != TokenType.SEMICOLON:
|
|
self.compile_expression()
|
|
self.emit(Opcode.RET, 1)
|
|
else:
|
|
self.emit(Opcode.RET, 0)
|
|
|
|
self.expect(TokenType.SEMICOLON, "return statement")
|
|
|
|
def compile_call(self, func_name: str):
|
|
"""Compile function call with argument validation"""
|
|
self.expect(TokenType.LPAREN, "function call arguments")
|
|
|
|
arg_count = 0
|
|
while self.current_token.type != TokenType.RPAREN:
|
|
self.compile_expression()
|
|
arg_count += 1
|
|
if self.current_token.type == TokenType.COMMA:
|
|
self.eat(TokenType.COMMA)
|
|
|
|
self.expect(TokenType.RPAREN, "function call arguments")
|
|
|
|
# Check if it's print (builtin)
|
|
if func_name == 'print':
|
|
if arg_count != 1:
|
|
self.error("print() expects exactly 1 argument")
|
|
self.emit(Opcode.PRINT)
|
|
return
|
|
|
|
# Find function index
|
|
func_idx = None
|
|
for i, func in enumerate(self.functions):
|
|
if func['name'] == func_name:
|
|
func_idx = i
|
|
break
|
|
|
|
if func_idx is None:
|
|
self.error(f"Undefined function '{func_name}'")
|
|
|
|
# Validate argument count
|
|
target_func = self.functions[func_idx]
|
|
if arg_count != target_func['arg_count']:
|
|
self.error(f"Function '{func_name}' expects {target_func['arg_count']} arguments, got {arg_count}")
|
|
|
|
self.emit(Opcode.CALL, *struct.pack('<H', func_idx), arg_count)
|
|
|
|
def compile_expression(self):
|
|
"""Compile expression (recursive descent)"""
|
|
self.compile_assignment_expression()
|
|
|
|
def compile_assignment_expression(self):
|
|
"""Compile assignment expressions"""
|
|
# For now, handle simple cases - full assignment expression parsing would be more complex
|
|
self.compile_or_expression()
|
|
|
|
def compile_or_expression(self):
|
|
"""Compile logical OR"""
|
|
self.compile_and_expression()
|
|
|
|
while self.current_token.type == TokenType.OR:
|
|
self.eat(TokenType.OR)
|
|
self.compile_and_expression()
|
|
self.emit(Opcode.BIT_OR) # Using bitwise OR for now
|
|
|
|
def compile_and_expression(self):
|
|
"""Compile logical AND"""
|
|
self.compile_equality()
|
|
|
|
while self.current_token.type == TokenType.AND:
|
|
self.eat(TokenType.AND)
|
|
self.compile_equality()
|
|
self.emit(Opcode.BIT_AND) # Using bitwise AND for now
|
|
|
|
def compile_equality(self):
|
|
"""Compile equality operators"""
|
|
self.compile_comparison()
|
|
|
|
while self.current_token.type in [TokenType.EQ, TokenType.NEQ]:
|
|
op = self.current_token.type
|
|
self.eat(op)
|
|
self.compile_comparison()
|
|
|
|
if op == TokenType.EQ:
|
|
self.emit(Opcode.CMP_EQ)
|
|
else:
|
|
self.emit(Opcode.CMP_NEQ)
|
|
|
|
def compile_comparison(self):
|
|
"""Compile comparison operators"""
|
|
self.compile_term()
|
|
|
|
while self.current_token.type in [TokenType.LT, TokenType.GT, TokenType.LE, TokenType.GE]:
|
|
op = self.current_token.type
|
|
self.eat(op)
|
|
self.compile_term()
|
|
|
|
op_map = {
|
|
TokenType.LT: Opcode.CMP_LT,
|
|
TokenType.GT: Opcode.CMP_GT,
|
|
TokenType.LE: Opcode.CMP_LE,
|
|
TokenType.GE: Opcode.CMP_GE,
|
|
}
|
|
self.emit(op_map[op])
|
|
|
|
def compile_term(self):
|
|
"""Compile addition/subtraction"""
|
|
self.compile_factor()
|
|
|
|
while self.current_token.type in [TokenType.PLUS, TokenType.MINUS]:
|
|
op = self.current_token.type
|
|
self.eat(op)
|
|
self.compile_factor()
|
|
|
|
if op == TokenType.PLUS:
|
|
self.emit(Opcode.ADD)
|
|
else:
|
|
self.emit(Opcode.SUB)
|
|
|
|
def compile_factor(self):
|
|
"""Compile multiplication/division/modulo"""
|
|
self.compile_unary()
|
|
|
|
while self.current_token.type in [TokenType.STAR, TokenType.SLASH, TokenType.PERCENT]:
|
|
op = self.current_token.type
|
|
self.eat(op)
|
|
self.compile_unary()
|
|
|
|
op_map = {
|
|
TokenType.STAR: Opcode.MUL,
|
|
TokenType.SLASH: Opcode.DIV,
|
|
TokenType.PERCENT: Opcode.MOD,
|
|
}
|
|
self.emit(op_map[op])
|
|
|
|
def compile_unary(self):
|
|
"""Compile unary operators including ++/-- as prefix"""
|
|
if self.current_token.type in [TokenType.MINUS, TokenType.NOT, TokenType.INCREMENT, TokenType.DECREMENT]:
|
|
op = self.current_token.type
|
|
self.eat(op)
|
|
|
|
if op in [TokenType.INCREMENT, TokenType.DECREMENT]:
|
|
# Prefix ++/--
|
|
if self.current_token.type != TokenType.IDENTIFIER:
|
|
self.error("Prefix increment/decrement requires a variable")
|
|
|
|
var_name = self.current_token.value
|
|
if var_name not in self.local_vars:
|
|
self.error(f"Undefined variable '{var_name}'")
|
|
|
|
local_idx = self.local_vars[var_name]
|
|
|
|
# Load variable
|
|
self.emit(Opcode.LOAD_LOCAL, *struct.pack('<H', local_idx))
|
|
|
|
# Push 1
|
|
self.emit(Opcode.PUSH_INT, 32, *struct.pack('<i', 1))
|
|
|
|
# Add or subtract
|
|
if op == TokenType.INCREMENT:
|
|
self.emit(Opcode.ADD)
|
|
else:
|
|
self.emit(Opcode.SUB)
|
|
|
|
# Duplicate for return value
|
|
self.emit(Opcode.DUP)
|
|
|
|
# Store back
|
|
self.emit(Opcode.STORE_LOCAL, *struct.pack('<H', local_idx))
|
|
|
|
self.eat(TokenType.IDENTIFIER)
|
|
else:
|
|
self.compile_unary()
|
|
|
|
if op == TokenType.MINUS:
|
|
self.emit(Opcode.NEG)
|
|
elif op == TokenType.NOT:
|
|
# Logical NOT - push 1, compare for equality with 0
|
|
self.emit(Opcode.PUSH_INT, 32, *struct.pack('<i', 0))
|
|
self.emit(Opcode.CMP_EQ)
|
|
else:
|
|
self.compile_primary()
|
|
|
|
def compile_primary(self):
|
|
"""Compile primary expressions"""
|
|
if self.current_token.type == TokenType.INT_LITERAL:
|
|
value = self.current_token.value
|
|
self.emit(Opcode.PUSH_INT, 32, *struct.pack('<i', value))
|
|
self.eat(TokenType.INT_LITERAL)
|
|
|
|
elif self.current_token.type == TokenType.FLOAT_LITERAL:
|
|
value = self.current_token.value
|
|
self.emit(Opcode.PUSH_FLOAT, *struct.pack('<f', value))
|
|
self.eat(TokenType.FLOAT_LITERAL)
|
|
|
|
elif self.current_token.type == TokenType.BOOL_LITERAL:
|
|
value = self.current_token.value
|
|
self.emit(Opcode.PUSH_INT, 8, *struct.pack('<i', 1 if value else 0))
|
|
self.eat(TokenType.BOOL_LITERAL)
|
|
|
|
elif self.current_token.type == TokenType.STRING_LITERAL:
|
|
value = self.current_token.value
|
|
const_idx = self.add_constant(TypeCode.STR, value)
|
|
self.emit(Opcode.PUSH_STR, *struct.pack('<I', const_idx))
|
|
self.eat(TokenType.STRING_LITERAL)
|
|
|
|
elif self.current_token.type == TokenType.IDENTIFIER:
|
|
name = self.current_token.value
|
|
self.eat(TokenType.IDENTIFIER)
|
|
|
|
# Check for postfix increment/decrement
|
|
if self.current_token.type in [TokenType.INCREMENT, TokenType.DECREMENT]:
|
|
op = self.current_token.type
|
|
self.eat(op)
|
|
|
|
if name not in self.local_vars:
|
|
self.error(f"Undefined variable '{name}'")
|
|
|
|
local_idx = self.local_vars[name]
|
|
|
|
# Load original value for return
|
|
self.emit(Opcode.LOAD_LOCAL, *struct.pack('<H', local_idx))
|
|
|
|
# Duplicate for modification
|
|
self.emit(Opcode.DUP)
|
|
|
|
# Push 1
|
|
self.emit(Opcode.PUSH_INT, 32, *struct.pack('<i', 1))
|
|
|
|
# Add or subtract
|
|
if op == TokenType.INCREMENT:
|
|
self.emit(Opcode.ADD)
|
|
else:
|
|
self.emit(Opcode.SUB)
|
|
|
|
# Store modified value
|
|
self.emit(Opcode.STORE_LOCAL, *struct.pack('<H', local_idx))
|
|
|
|
# Original value remains on stack
|
|
elif self.current_token.type == TokenType.LPAREN:
|
|
# Function call in expression
|
|
self.compile_call(name)
|
|
else:
|
|
# Variable reference
|
|
if name not in self.local_vars:
|
|
self.error(f"Undefined variable '{name}'")
|
|
local_idx = self.local_vars[name]
|
|
self.emit(Opcode.LOAD_LOCAL, *struct.pack('<H', local_idx))
|
|
|
|
elif self.current_token.type == TokenType.LPAREN:
|
|
self.eat(TokenType.LPAREN)
|
|
self.compile_expression()
|
|
self.eat(TokenType.RPAREN)
|
|
|
|
else:
|
|
self.error(f"Unexpected token in expression: {self.current_token.type.name}")
|
|
|
|
def generate_bytecode(self) -> bytes:
|
|
"""Generate final bytecode"""
|
|
bytecode = bytearray()
|
|
|
|
# Header
|
|
bytecode.extend(b'POPC') # Magic for .popclass files
|
|
bytecode.extend(struct.pack('<H', 1)) # Version major
|
|
bytecode.extend(struct.pack('<H', 0)) # Version minor
|
|
bytecode.extend(b'\x00\x00\x00\x00') # Reserved
|
|
|
|
# Constant pool
|
|
bytecode.extend(struct.pack('<I', len(self.constants)))
|
|
for type_code, value in self.constants:
|
|
if type_code == TypeCode.STR:
|
|
bytecode.append(2) # str type
|
|
str_bytes = value.encode('utf-8')
|
|
bytecode.extend(struct.pack('<I', len(str_bytes)))
|
|
bytecode.extend(str_bytes)
|
|
elif type_code == TypeCode.F32:
|
|
bytecode.append(1) # float type
|
|
bytecode.extend(struct.pack('<f', value))
|
|
else:
|
|
bytecode.append(0) # int type
|
|
bytecode.append(32) # width
|
|
bytecode.append(0 if type_code in [TypeCode.U8, TypeCode.U16, TypeCode.U32] else 1) # signed
|
|
bytecode.extend(struct.pack('<i', value))
|
|
|
|
# Function table
|
|
bytecode.extend(struct.pack('<I', len(self.functions)))
|
|
|
|
# Calculate code offsets
|
|
code_offset = len(bytecode) + len(self.functions) * 16
|
|
|
|
for func in self.functions:
|
|
# Name constant index (0xffffffff for now)
|
|
bytecode.extend(struct.pack('<I', 0xFFFFFFFF))
|
|
bytecode.append(func['arg_count'])
|
|
bytecode.append(func['local_count'])
|
|
bytecode.extend(b'\x00\x00') # Reserved
|
|
bytecode.extend(struct.pack('<I', len(func['code'])))
|
|
bytecode.extend(struct.pack('<I', code_offset))
|
|
code_offset += len(func['code'])
|
|
|
|
# Code sections
|
|
for func in self.functions:
|
|
bytecode.extend(func['code'])
|
|
|
|
return bytes(bytecode)
|
|
|
|
# ============================================================================
|
|
# MAIN COMMAND LINE INTERFACE
|
|
# ============================================================================
|
|
|
|
def compile_source(source_file: str, output_file: Optional[str] = None) -> bool:
|
|
"""Compile source file to .popclass file with improved error reporting"""
|
|
if not os.path.exists(source_file):
|
|
print(f"Error: Source file '{source_file}' not found")
|
|
return False
|
|
|
|
if not output_file:
|
|
output_file = os.path.splitext(source_file)[0] + '.popclass'
|
|
|
|
try:
|
|
with open(source_file, 'r', encoding='utf-8') as f:
|
|
source = f.read()
|
|
|
|
compiler = Compiler(source, source_file)
|
|
bytecode = compiler.compile()
|
|
|
|
with open(output_file, 'wb') as f:
|
|
f.write(bytecode)
|
|
|
|
print(f"Successfully compiled {source_file} to {output_file}")
|
|
return True
|
|
except SyntaxError as e:
|
|
print(f"Compilation error in {e.filename}:{e.lineno}: {e.msg}")
|
|
return False
|
|
except Exception as e:
|
|
print(f"Compilation error: {e}")
|
|
traceback.print_exc()
|
|
return False
|
|
|
|
def disassemble_file(popclass_file: str) -> bool:
|
|
"""Disassemble .popclass file"""
|
|
if not os.path.exists(popclass_file):
|
|
print(f"Error: .popclass file '{popclass_file}' not found")
|
|
return False
|
|
|
|
try:
|
|
with open(popclass_file, 'rb') as f:
|
|
bytecode = f.read()
|
|
|
|
disassembler = Disassembler(bytecode, popclass_file)
|
|
disassembly = disassembler.disassemble()
|
|
|
|
base_name = os.path.splitext(popclass_file)[0]
|
|
disasm_file = base_name + '.popasm'
|
|
|
|
# Save disassembly result
|
|
with open(disasm_file, 'w', encoding='utf-8') as f:
|
|
f.write(disassembly)
|
|
|
|
print(disassembly)
|
|
return True
|
|
except Exception as e:
|
|
print(f"Disassembly error: {e}")
|
|
traceback.print_exc()
|
|
return False
|
|
|
|
def execute_popclass(popclass_file: str, debug: bool = False) -> bool:
|
|
"""Execute .popclass file"""
|
|
if not os.path.exists(popclass_file):
|
|
print(f"Error: .popclass file '{popclass_file}' not found")
|
|
return False
|
|
|
|
try:
|
|
with open(popclass_file, 'rb') as f:
|
|
bytecode = f.read()
|
|
|
|
vm = VM(bytecode, popclass_file)
|
|
vm.run(debug=debug)
|
|
return True
|
|
except Exception as e:
|
|
print(f"Execution error: {e}")
|
|
traceback.print_exc()
|
|
return False
|
|
|
|
def main():
|
|
if len(sys.argv) < 2:
|
|
print("POP VM Tools - Enhanced with Type Safety and Better Error Messages")
|
|
print("Usage:")
|
|
print(" python interpreter.py compile <source_file> [output_file]")
|
|
print(" python interpreter.py disasm <popclass_file>")
|
|
print(" python interpreter.py run <popclass_file>")
|
|
print(" python interpreter.py debug <popclass_file>")
|
|
print(" python interpreter.py <source_file> (compiles and runs)")
|
|
print("\nFeatures:")
|
|
print(" - Improved type safety and error messages")
|
|
print(" - C-style for loops with ++/-- operators")
|
|
print(" - Better debugging information")
|
|
return
|
|
|
|
command = sys.argv[1]
|
|
|
|
if command == 'compile':
|
|
if len(sys.argv) < 3:
|
|
print("Error: No source file specified")
|
|
return
|
|
|
|
source_file = sys.argv[2]
|
|
output_file = sys.argv[3] if len(sys.argv) > 3 else None
|
|
|
|
compile_source(source_file, output_file)
|
|
|
|
elif command == 'disasm':
|
|
if len(sys.argv) < 3:
|
|
print("Error: No .popclass file specified")
|
|
return
|
|
|
|
popclass_file = sys.argv[2]
|
|
disassemble_file(popclass_file)
|
|
|
|
elif command == 'run':
|
|
if len(sys.argv) < 3:
|
|
print("Error: No .popclass file specified")
|
|
return
|
|
|
|
popclass_file = sys.argv[2]
|
|
execute_popclass(popclass_file)
|
|
|
|
elif command == 'debug':
|
|
if len(sys.argv) < 3:
|
|
print("Error: No .popclass file specified")
|
|
return
|
|
|
|
popclass_file = sys.argv[2]
|
|
execute_popclass(popclass_file, debug=True)
|
|
|
|
else:
|
|
# Assume it's a source file - compile and run
|
|
source_file = sys.argv[1]
|
|
|
|
if not os.path.exists(source_file):
|
|
print(f"Error: Source file '{source_file}' not found")
|
|
return
|
|
|
|
# Compile to temporary file
|
|
temp_file = 'temp.popclass'
|
|
if compile_source(source_file, temp_file):
|
|
# Run the compiled file
|
|
execute_popclass(temp_file)
|
|
|
|
# Clean up temporary file
|
|
try:
|
|
os.remove(temp_file)
|
|
except:
|
|
pass
|
|
|
|
if __name__ == '__main__':
|
|
main() |