|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
""" |
|
Decorator module, see |
|
https://github.com/micheles/decorator/blob/master/docs/documentation.md |
|
for the documentation. |
|
""" |
|
import re |
|
import sys |
|
import inspect |
|
import operator |
|
import itertools |
|
from contextlib import _GeneratorContextManager |
|
from inspect import getfullargspec, iscoroutinefunction, isgeneratorfunction |
|
|
|
__version__ = '5.1.1' |
|
|
|
DEF = re.compile(r'\s*def\s*([_\w][_\w\d]*)\s*\(') |
|
POS = inspect.Parameter.POSITIONAL_OR_KEYWORD |
|
EMPTY = inspect.Parameter.empty |
|
|
|
|
|
|
|
class FunctionMaker(object): |
|
""" |
|
An object with the ability to create functions with a given signature. |
|
It has attributes name, doc, module, signature, defaults, dict and |
|
methods update and make. |
|
""" |
|
|
|
|
|
_compile_count = itertools.count() |
|
|
|
|
|
args = varargs = varkw = defaults = kwonlyargs = kwonlydefaults = () |
|
|
|
def __init__(self, func=None, name=None, signature=None, |
|
defaults=None, doc=None, module=None, funcdict=None): |
|
self.shortsignature = signature |
|
if func: |
|
|
|
self.name = func.__name__ |
|
if self.name == '<lambda>': |
|
self.name = '_lambda_' |
|
self.doc = func.__doc__ |
|
self.module = func.__module__ |
|
if inspect.isroutine(func): |
|
argspec = getfullargspec(func) |
|
self.annotations = getattr(func, '__annotations__', {}) |
|
for a in ('args', 'varargs', 'varkw', 'defaults', 'kwonlyargs', |
|
'kwonlydefaults'): |
|
setattr(self, a, getattr(argspec, a)) |
|
for i, arg in enumerate(self.args): |
|
setattr(self, 'arg%d' % i, arg) |
|
allargs = list(self.args) |
|
allshortargs = list(self.args) |
|
if self.varargs: |
|
allargs.append('*' + self.varargs) |
|
allshortargs.append('*' + self.varargs) |
|
elif self.kwonlyargs: |
|
allargs.append('*') |
|
for a in self.kwonlyargs: |
|
allargs.append('%s=None' % a) |
|
allshortargs.append('%s=%s' % (a, a)) |
|
if self.varkw: |
|
allargs.append('**' + self.varkw) |
|
allshortargs.append('**' + self.varkw) |
|
self.signature = ', '.join(allargs) |
|
self.shortsignature = ', '.join(allshortargs) |
|
self.dict = func.__dict__.copy() |
|
|
|
if name: |
|
self.name = name |
|
if signature is not None: |
|
self.signature = signature |
|
if defaults: |
|
self.defaults = defaults |
|
if doc: |
|
self.doc = doc |
|
if module: |
|
self.module = module |
|
if funcdict: |
|
self.dict = funcdict |
|
|
|
assert hasattr(self, 'name') |
|
if not hasattr(self, 'signature'): |
|
raise TypeError('You are decorating a non function: %s' % func) |
|
|
|
def update(self, func, **kw): |
|
""" |
|
Update the signature of func with the data in self |
|
""" |
|
func.__name__ = self.name |
|
func.__doc__ = getattr(self, 'doc', None) |
|
func.__dict__ = getattr(self, 'dict', {}) |
|
func.__defaults__ = self.defaults |
|
func.__kwdefaults__ = self.kwonlydefaults or None |
|
func.__annotations__ = getattr(self, 'annotations', None) |
|
try: |
|
frame = sys._getframe(3) |
|
except AttributeError: |
|
callermodule = '?' |
|
else: |
|
callermodule = frame.f_globals.get('__name__', '?') |
|
func.__module__ = getattr(self, 'module', callermodule) |
|
func.__dict__.update(kw) |
|
|
|
def make(self, src_templ, evaldict=None, addsource=False, **attrs): |
|
""" |
|
Make a new function from a given template and update the signature |
|
""" |
|
src = src_templ % vars(self) |
|
evaldict = evaldict or {} |
|
mo = DEF.search(src) |
|
if mo is None: |
|
raise SyntaxError('not a valid function template\n%s' % src) |
|
name = mo.group(1) |
|
names = set([name] + [arg.strip(' *') for arg in |
|
self.shortsignature.split(',')]) |
|
for n in names: |
|
if n in ('_func_', '_call_'): |
|
raise NameError('%s is overridden in\n%s' % (n, src)) |
|
|
|
if not src.endswith('\n'): |
|
src += '\n' |
|
|
|
|
|
|
|
|
|
filename = '<decorator-gen-%d>' % next(self._compile_count) |
|
try: |
|
code = compile(src, filename, 'single') |
|
exec(code, evaldict) |
|
except Exception: |
|
print('Error in generated code:', file=sys.stderr) |
|
print(src, file=sys.stderr) |
|
raise |
|
func = evaldict[name] |
|
if addsource: |
|
attrs['__source__'] = src |
|
self.update(func, **attrs) |
|
return func |
|
|
|
@classmethod |
|
def create(cls, obj, body, evaldict, defaults=None, |
|
doc=None, module=None, addsource=True, **attrs): |
|
""" |
|
Create a function from the strings name, signature and body. |
|
evaldict is the evaluation dictionary. If addsource is true an |
|
attribute __source__ is added to the result. The attributes attrs |
|
are added, if any. |
|
""" |
|
if isinstance(obj, str): |
|
name, rest = obj.strip().split('(', 1) |
|
signature = rest[:-1] |
|
func = None |
|
else: |
|
name = None |
|
signature = None |
|
func = obj |
|
self = cls(func, name, signature, defaults, doc, module) |
|
ibody = '\n'.join(' ' + line for line in body.splitlines()) |
|
caller = evaldict.get('_call_') |
|
if caller and iscoroutinefunction(caller): |
|
body = ('async def %(name)s(%(signature)s):\n' + ibody).replace( |
|
'return', 'return await') |
|
else: |
|
body = 'def %(name)s(%(signature)s):\n' + ibody |
|
return self.make(body, evaldict, addsource, **attrs) |
|
|
|
|
|
def fix(args, kwargs, sig): |
|
""" |
|
Fix args and kwargs to be consistent with the signature |
|
""" |
|
ba = sig.bind(*args, **kwargs) |
|
ba.apply_defaults() |
|
return ba.args, ba.kwargs |
|
|
|
|
|
def decorate(func, caller, extras=(), kwsyntax=False): |
|
""" |
|
Decorates a function/generator/coroutine using a caller. |
|
If kwsyntax is True calling the decorated functions with keyword |
|
syntax will pass the named arguments inside the ``kw`` dictionary, |
|
even if such argument are positional, similarly to what functools.wraps |
|
does. By default kwsyntax is False and the the arguments are untouched. |
|
""" |
|
sig = inspect.signature(func) |
|
if iscoroutinefunction(caller): |
|
async def fun(*args, **kw): |
|
if not kwsyntax: |
|
args, kw = fix(args, kw, sig) |
|
return await caller(func, *(extras + args), **kw) |
|
elif isgeneratorfunction(caller): |
|
def fun(*args, **kw): |
|
if not kwsyntax: |
|
args, kw = fix(args, kw, sig) |
|
for res in caller(func, *(extras + args), **kw): |
|
yield res |
|
else: |
|
def fun(*args, **kw): |
|
if not kwsyntax: |
|
args, kw = fix(args, kw, sig) |
|
return caller(func, *(extras + args), **kw) |
|
fun.__name__ = func.__name__ |
|
fun.__doc__ = func.__doc__ |
|
fun.__wrapped__ = func |
|
fun.__signature__ = sig |
|
fun.__qualname__ = func.__qualname__ |
|
|
|
try: |
|
fun.__defaults__ = func.__defaults__ |
|
except AttributeError: |
|
pass |
|
try: |
|
fun.__kwdefaults__ = func.__kwdefaults__ |
|
except AttributeError: |
|
pass |
|
try: |
|
fun.__annotations__ = func.__annotations__ |
|
except AttributeError: |
|
pass |
|
try: |
|
fun.__module__ = func.__module__ |
|
except AttributeError: |
|
pass |
|
try: |
|
fun.__dict__.update(func.__dict__) |
|
except AttributeError: |
|
pass |
|
return fun |
|
|
|
|
|
def decoratorx(caller): |
|
""" |
|
A version of "decorator" implemented via "exec" and not via the |
|
Signature object. Use this if you are want to preserve the `.__code__` |
|
object properties (https://github.com/micheles/decorator/issues/129). |
|
""" |
|
def dec(func): |
|
return FunctionMaker.create( |
|
func, |
|
"return _call_(_func_, %(shortsignature)s)", |
|
dict(_call_=caller, _func_=func), |
|
__wrapped__=func, __qualname__=func.__qualname__) |
|
return dec |
|
|
|
|
|
def decorator(caller, _func=None, kwsyntax=False): |
|
""" |
|
decorator(caller) converts a caller function into a decorator |
|
""" |
|
if _func is not None: |
|
|
|
return decorate(_func, caller, (), kwsyntax) |
|
|
|
sig = inspect.signature(caller) |
|
dec_params = [p for p in sig.parameters.values() if p.kind is POS] |
|
|
|
def dec(func=None, *args, **kw): |
|
na = len(args) + 1 |
|
extras = args + tuple(kw.get(p.name, p.default) |
|
for p in dec_params[na:] |
|
if p.default is not EMPTY) |
|
if func is None: |
|
return lambda func: decorate(func, caller, extras, kwsyntax) |
|
else: |
|
return decorate(func, caller, extras, kwsyntax) |
|
dec.__signature__ = sig.replace(parameters=dec_params) |
|
dec.__name__ = caller.__name__ |
|
dec.__doc__ = caller.__doc__ |
|
dec.__wrapped__ = caller |
|
dec.__qualname__ = caller.__qualname__ |
|
dec.__kwdefaults__ = getattr(caller, '__kwdefaults__', None) |
|
dec.__dict__.update(caller.__dict__) |
|
return dec |
|
|
|
|
|
|
|
|
|
|
|
class ContextManager(_GeneratorContextManager): |
|
def __init__(self, g, *a, **k): |
|
_GeneratorContextManager.__init__(self, g, a, k) |
|
|
|
def __call__(self, func): |
|
def caller(f, *a, **k): |
|
with self.__class__(self.func, *self.args, **self.kwds): |
|
return f(*a, **k) |
|
return decorate(func, caller) |
|
|
|
|
|
_contextmanager = decorator(ContextManager) |
|
|
|
|
|
def contextmanager(func): |
|
|
|
return _contextmanager(func) |
|
|
|
|
|
|
|
|
|
def append(a, vancestors): |
|
""" |
|
Append ``a`` to the list of the virtual ancestors, unless it is already |
|
included. |
|
""" |
|
add = True |
|
for j, va in enumerate(vancestors): |
|
if issubclass(va, a): |
|
add = False |
|
break |
|
if issubclass(a, va): |
|
vancestors[j] = a |
|
add = False |
|
if add: |
|
vancestors.append(a) |
|
|
|
|
|
|
|
def dispatch_on(*dispatch_args): |
|
""" |
|
Factory of decorators turning a function into a generic function |
|
dispatching on the given arguments. |
|
""" |
|
assert dispatch_args, 'No dispatch args passed' |
|
dispatch_str = '(%s,)' % ', '.join(dispatch_args) |
|
|
|
def check(arguments, wrong=operator.ne, msg=''): |
|
"""Make sure one passes the expected number of arguments""" |
|
if wrong(len(arguments), len(dispatch_args)): |
|
raise TypeError('Expected %d arguments, got %d%s' % |
|
(len(dispatch_args), len(arguments), msg)) |
|
|
|
def gen_func_dec(func): |
|
"""Decorator turning a function into a generic function""" |
|
|
|
|
|
argset = set(getfullargspec(func).args) |
|
if not set(dispatch_args) <= argset: |
|
raise NameError('Unknown dispatch arguments %s' % dispatch_str) |
|
|
|
typemap = {} |
|
|
|
def vancestors(*types): |
|
""" |
|
Get a list of sets of virtual ancestors for the given types |
|
""" |
|
check(types) |
|
ras = [[] for _ in range(len(dispatch_args))] |
|
for types_ in typemap: |
|
for t, type_, ra in zip(types, types_, ras): |
|
if issubclass(t, type_) and type_ not in t.mro(): |
|
append(type_, ra) |
|
return [set(ra) for ra in ras] |
|
|
|
def ancestors(*types): |
|
""" |
|
Get a list of virtual MROs, one for each type |
|
""" |
|
check(types) |
|
lists = [] |
|
for t, vas in zip(types, vancestors(*types)): |
|
n_vas = len(vas) |
|
if n_vas > 1: |
|
raise RuntimeError( |
|
'Ambiguous dispatch for %s: %s' % (t, vas)) |
|
elif n_vas == 1: |
|
va, = vas |
|
mro = type('t', (t, va), {}).mro()[1:] |
|
else: |
|
mro = t.mro() |
|
lists.append(mro[:-1]) |
|
return lists |
|
|
|
def register(*types): |
|
""" |
|
Decorator to register an implementation for the given types |
|
""" |
|
check(types) |
|
|
|
def dec(f): |
|
check(getfullargspec(f).args, operator.lt, ' in ' + f.__name__) |
|
typemap[types] = f |
|
return f |
|
return dec |
|
|
|
def dispatch_info(*types): |
|
""" |
|
An utility to introspect the dispatch algorithm |
|
""" |
|
check(types) |
|
lst = [] |
|
for anc in itertools.product(*ancestors(*types)): |
|
lst.append(tuple(a.__name__ for a in anc)) |
|
return lst |
|
|
|
def _dispatch(dispatch_args, *args, **kw): |
|
types = tuple(type(arg) for arg in dispatch_args) |
|
try: |
|
f = typemap[types] |
|
except KeyError: |
|
pass |
|
else: |
|
return f(*args, **kw) |
|
combinations = itertools.product(*ancestors(*types)) |
|
next(combinations) |
|
for types_ in combinations: |
|
f = typemap.get(types_) |
|
if f is not None: |
|
return f(*args, **kw) |
|
|
|
|
|
return func(*args, **kw) |
|
|
|
return FunctionMaker.create( |
|
func, 'return _f_(%s, %%(shortsignature)s)' % dispatch_str, |
|
dict(_f_=_dispatch), register=register, default=func, |
|
typemap=typemap, vancestors=vancestors, ancestors=ancestors, |
|
dispatch_info=dispatch_info, __wrapped__=func) |
|
|
|
gen_func_dec.__name__ = 'dispatch_on' + dispatch_str |
|
return gen_func_dec |
|
|