Files
mucapy/venv/lib/python3.12/site-packages/q.py
rattatwinko 8e9b2568b2 initial
2025-05-25 20:42:58 +02:00

401 lines
15 KiB
Python

# Copyright 2012 Google Inc. All Rights Reserved.
# vim: set ts=4 sw=4 et sts=4 ai:
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at: http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distrib-
# uted under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
# OR CONDITIONS OF ANY KIND, either express or implied. See the License for
# specific language governing permissions and limitations under the License.
"""Quick and dirty debugging output for tired programmers.
All output goes to /tmp/q, which you can watch with this shell command:
tail -f /tmp/q
If TMPDIR is set, the output goes to $TMPDIR/q.
To print the value of foo, insert this into your program:
import q; q(foo)
To print the value of something in the middle of an expression, insert
"q()", "q/", or "q|". For example, given this statement:
file.write(prefix + (sep or '').join(items))
...you can print out various values without using any temporary variables:
file.write(prefix + q(sep or '').join(items)) # prints (sep or '')
file.write(q/prefix + (sep or '').join(items)) # prints prefix
file.write(q|prefix + (sep or '').join(items)) # prints the arg to write
To trace a function's arguments and return value, insert this above the def:
import q
@q
To start an interactive console at any point in your code, call q.d():
import q; q.d()
"""
from __future__ import print_function
import sys
__author__ = 'Ka-Ping Yee <ping@zesty.ca>'
# WARNING: Horrible abuse of sys.modules, __call__, __div__, __or__, inspect,
# sys._getframe, and more! q's behaviour changes depending on the text of the
# source code near its call site. Don't ever do this in real code!
# These are reused below in both Q and Writer.
ESCAPE_SEQUENCES = ['\x1b[0m'] + ['\x1b[3%dm' % i for i in range(1, 7)]
if sys.version_info >= (3,):
BASESTRING_TYPES = (str, bytes)
TEXT_TYPES = (str,)
else:
BASESTRING_TYPES = (basestring,)
TEXT_TYPES = (unicode,)
# When we insert Q() into sys.modules, all the globals become None, so we
# have to keep everything we use inside the Q class.
class Q(object):
__doc__ = __doc__ # from the module's __doc__ above
import ast
import code
import inspect
import os
import pydoc
import sys
import random
import re
import time
import functools
import tempfile
# The debugging log will go to this file; temporary files will also have
# this path as a prefix, followed by a random number.
OUTPUT_PATH = os.path.join(tempfile.gettempdir(), 'q')
NORMAL, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN = ESCAPE_SEQUENCES
TEXT_REPR = pydoc.TextRepr()
q_max_length = 1_000_000
@property
def short(self):
__class__.TEXT_REPR = __class__.pydoc.TextRepr()
@property
def long(self):
__class__.TEXT_REPR = __class__.pydoc.TextRepr()
__class__.TEXT_REPR.maxarray = __class__.q_max_length
__class__.TEXT_REPR.maxdeque = __class__.q_max_length
__class__.TEXT_REPR.maxdict = __class__.q_max_length
__class__.TEXT_REPR.maxfrozenset = __class__.q_max_length
__class__.TEXT_REPR.maxlevel = __class__.q_max_length
__class__.TEXT_REPR.maxlist = __class__.q_max_length
__class__.TEXT_REPR.maxlong = __class__.q_max_length
__class__.TEXT_REPR.maxother = __class__.q_max_length
__class__.TEXT_REPR.maxset = __class__.q_max_length
__class__.TEXT_REPR.maxstring = __class__.q_max_length
__class__.TEXT_REPR.maxtuple = __class__.q_max_length
@long.setter
def long(self, value):
__class__.q_max_length = value
self.long
# For portably converting strings between python2 and python3
BASESTRING_TYPES = BASESTRING_TYPES
TEXT_TYPES = TEXT_TYPES
class FileWriter(object):
"""An object that appends to or overwrites a single file."""
import sys
# For portably converting strings between python2 and python3
BASESTRING_TYPES = BASESTRING_TYPES
TEXT_TYPES = TEXT_TYPES
def __init__(self, path):
self.path = path
self.open = open
# App Engine's dev_appserver patches 'open' to simulate security
# restrictions in production; we circumvent this to write output.
if open.__name__ == 'FakeFile': # dev_appserver's patched 'file'
self.open = open.__bases__[0] # the original built-in 'file'
def write(self, mode, content):
if 'b' not in mode:
mode = '%sb' % mode
if (isinstance(content, self.BASESTRING_TYPES) and
isinstance(content, self.TEXT_TYPES)):
content = content.encode('utf-8')
try:
f = self.open(self.path, mode)
f.write(content)
f.close()
except IOError:
pass
class Writer:
"""Abstract away the output pipe, timestamping, and color support."""
NORMAL, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN = ESCAPE_SEQUENCES
def __init__(self, file_writer, time):
self.color = True
self.file_writer = file_writer
self.gap_seconds = 2
self.time = time # the 'time' module (needed because no globals)
self.start_time = self.time.time()
self.last_write = 0
def write(self, chunks):
"""Writes out a list of strings as a single timestamped unit."""
if not self.color:
chunks = [x for x in chunks if not x.startswith('\x1b')]
content = ''.join(chunks)
now = self.time.time()
prefix = '%4.1fs ' % ((now - self.start_time) % 100)
indent = ' ' * len(prefix)
if self.color:
prefix = self.YELLOW + prefix + self.NORMAL
if now - self.last_write >= self.gap_seconds:
prefix = '\n' + prefix
self.last_write = now
output = prefix + content.replace('\n', '\n' + indent)
self.file_writer.write('a', output + '\n')
class Stanza:
"""Abstract away indentation and line-wrapping."""
def __init__(self, indent=0, width=80 - 7):
self.chunks = [' ' * indent]
self.indent = indent
self.column = indent
self.width = width
def newline(self):
if len(self.chunks) > 1:
self.column = self.width
def add(self, items, sep='', wrap=True):
"""Adds a list of strings that are to be printed on one line."""
items = list(map(str, items))
size = sum([len(x) for x in items if not x.startswith('\x1b')])
if (wrap and self.column > self.indent and
self.column + len(sep) + size > self.width):
self.chunks.append(sep.rstrip() + '\n' + ' ' * self.indent)
self.column = self.indent
else:
self.chunks.append(sep)
self.column += len(sep)
self.chunks.extend(items)
self.column += size
def __init__(self):
self.writer = self.Writer(self.FileWriter(self.OUTPUT_PATH), self.time)
self.indent = 0
# in_console tracks whether we're in an interactive console.
# We use it to display the caller as "<console>" instead of "<module>".
self.in_console = False
def unindent(self, lines):
"""Removes any indentation that is common to all of the given lines."""
indent = min(
len(self.re.match(r'^ *', line).group()) for line in lines)
return [line[indent:].rstrip() for line in lines]
def safe_repr(self, value):
# TODO: Use colour to distinguish '...' elision from actual '...'
# TODO: Show a nicer repr for SRE.Match objects.
# TODO: Show a nicer repr for big multiline strings.
result = self.TEXT_REPR.repr(value)
if isinstance(value, self.BASESTRING_TYPES) and len(value) > 80:
# If the string is big, save it to a file for later examination.
if isinstance(value, self.TEXT_TYPES):
value = value.encode('utf-8')
path = self.OUTPUT_PATH + '%08d.txt' % self.random.randrange(1e8)
self.FileWriter(path).write('w', value)
result += ' (file://' + path + ')'
return result
def get_call_exprs(self, line):
"""Gets the argument expressions from the source of a function call."""
line = line.lstrip()
try:
tree = self.ast.parse(line)
except SyntaxError:
return None
for node in self.ast.walk(tree):
if isinstance(node, self.ast.Call):
offsets = []
for arg in node.args:
# In Python 3.4 the col_offset is calculated wrong. See
# https://bugs.python.org/issue21295
if isinstance(arg, self.ast.Attribute) and (
(3, 4, 0) <= self.sys.version_info <= (3, 4, 3)):
offsets.append(arg.col_offset - len(arg.value.id) - 1)
else:
offsets.append(arg.col_offset)
if node.keywords:
line = line[:node.keywords[0].value.col_offset]
line = self.re.sub(r'\w+\s*=\s*$', '', line)
else:
line = self.re.sub(r'\s*\)\s*$', '', line)
offsets.append(len(line))
args = []
for i in range(len(node.args)):
args.append(line[offsets[i]:offsets[i + 1]].rstrip(', '))
return args
def show(self, func_name, values, labels=None):
"""Prints out nice representations of the given values."""
s = self.Stanza(self.indent)
if func_name == '<module>' and self.in_console:
func_name = '<console>'
s.add([func_name + ': '])
reprs = map(self.safe_repr, values)
if labels:
sep = ''
for label, repr in zip(labels, reprs):
s.add([label + '=', self.CYAN, repr, self.NORMAL], sep)
sep = ', '
else:
sep = ''
for repr in reprs:
s.add([self.CYAN, repr, self.NORMAL], sep)
sep = ', '
self.writer.write(s.chunks)
def trace(self, func):
"""Decorator to print out a function's arguments and return value."""
def wrapper(*args, **kwargs):
# Print out the call to the function with its arguments.
s = self.Stanza(self.indent)
s.add([self.GREEN, func.__name__, self.NORMAL, '('])
s.indent += 4
sep = ''
for arg in args:
s.add([self.CYAN, self.safe_repr(arg), self.NORMAL], sep)
sep = ', '
for name, value in sorted(kwargs.items()):
s.add([name + '=', self.CYAN, self.safe_repr(value),
self.NORMAL], sep)
sep = ', '
s.add(')', wrap=False)
self.writer.write(s.chunks)
# Call the function.
self.indent += 2
try:
result = func(*args, **kwargs)
except:
# Display an exception.
self.indent -= 2
etype, evalue, etb = self.sys.exc_info()
info = self.inspect.getframeinfo(etb.tb_next, context=3)
s = self.Stanza(self.indent)
s.add([self.RED, '!> ', self.safe_repr(evalue), self.NORMAL])
s.add(['at ', info.filename, ':', info.lineno], ' ')
lines = self.unindent(info.code_context)
firstlineno = info.lineno - info.index
fmt = '%' + str(len(str(firstlineno + len(lines)))) + 'd'
for i, line in enumerate(lines):
s.newline()
s.add([
i == info.index and self.MAGENTA or '',
fmt % (i + firstlineno),
i == info.index and '> ' or ': ', line, self.NORMAL])
self.writer.write(s.chunks)
raise
# Display the return value.
self.indent -= 2
s = self.Stanza(self.indent)
s.add([self.GREEN, '-> ', self.CYAN, self.safe_repr(result),
self.NORMAL])
self.writer.write(s.chunks)
return result
return self.functools.update_wrapper(wrapper, func)
def __call__(self, *args):
"""If invoked as a decorator on a function, adds tracing output to the
function; otherwise immediately prints out the arguments."""
info = self.inspect.getframeinfo(self.sys._getframe(1), context=9)
# info.index is the index of the line containing the end of the call
# expression, so this gets a few lines up to the end of the expression.
lines = ['']
if info.code_context:
lines = info.code_context[:info.index + 1]
# If we see "@q" on a single line, behave like a trace decorator.
for line in lines:
if line.strip() in ('@q', '@q()') and args:
return self.trace(args[0])
# Otherwise, search for the beginning of the call expression; once it
# parses, use the expressions in the call to label the debugging
# output.
for i in range(1, len(lines) + 1):
labels = self.get_call_exprs(''.join(lines[-i:]).replace('\n', ''))
if labels:
break
self.show(info.function, args, labels)
return args and args[0]
def __truediv__(self, arg): # a tight-binding operator
"""Prints out and returns the argument."""
info = self.inspect.getframeinfo(self.sys._getframe(1))
self.show(info.function, [arg])
return arg
# Compat for Python 2 without from future import __division__ turned on
__div__ = __truediv__
__or__ = __div__ # a loose-binding operator
q = __call__ # backward compatibility with @q.q
t = trace # backward compatibility with @q.t
__name__ = 'Q' # App Engine's import hook dies if this isn't present
def d(self, depth=1):
"""Launches an interactive console at the point where it's called."""
info = self.inspect.getframeinfo(self.sys._getframe(1))
s = self.Stanza(self.indent)
s.add([info.function + ': '])
s.add([self.MAGENTA, 'Interactive console opened', self.NORMAL])
self.writer.write(s.chunks)
frame = self.sys._getframe(depth)
env = frame.f_globals.copy()
env.update(frame.f_locals)
self.indent += 2
self.in_console = True
self.code.interact(
'Python console opened by q.d() in ' + info.function, local=env)
self.in_console = False
self.indent -= 2
s = self.Stanza(self.indent)
s.add([info.function + ': '])
s.add([self.MAGENTA, 'Interactive console closed', self.NORMAL])
self.writer.write(s.chunks)
# Install the Q() object in sys.modules so that "import q" gives a callable q.
q = Q()
q.long
sys.modules['q'] = q