import re
from typing import Type, TypeVar, Any
import venusian
from pyramid.config import Configurator
from zope.interface import Interface
from zope.interface.interface import InterfaceClass
from zope.interface.interfaces import IInterface
from functools import update_wrapper
from pyramid_services import _type_name
import warnings
def _resolve_iface(obj):
"""
Resolve an object to an interface.
If the object is already an interface, return it.
Otherwise, create and cache a generated interface for it.
"""
# if the object is an interface then we can quit early
if IInterface.providedBy(obj):
return obj
# look for a cached iface
iface = obj.__dict__.get('_pyramid_di_iface', None)
if iface is not None:
return iface
# make a new iface and cache it on the object
name = _type_name(obj)
iface = InterfaceClass(
'%s_IService' % name,
__doc__='pyramid_di generated interface',
)
obj._pyramid_di_iface = iface
return iface
__version__ = "0.4.4"
_to_underscores = re.compile("((?<=[a-z0-9])[A-Z]|(?!^)[A-Z](?=[a-z]))")
class reify_attr(object):
"""
reify_attr is like pyramid reify, but instead of getting the name of the
attribute from the decorated method, it uses the name of actual attribute,
by finding it on the class in Python <=3.5, and using the ``__set_name__``
on Python 3.6.
"""
names = None
def __init__(self, wrapped):
self.wrapped = wrapped
update_wrapper(self, wrapped)
def __get__(self, inst, objtype=None):
if inst is None:
return self
if self.names is None:
raise TypeError(
f"reify_attr decorating {self.wrapped} not bound to a named attribute!"
)
val = self.wrapped(inst)
for name in self.names:
setattr(inst, name, val)
return val
def __set_name__(self, owner, name):
if self.names is None:
self.names = [name]
else:
self.names.append(name)
def _underscore(name):
return _to_underscores.sub(r"_\1", name).lower()
_is_iface_name = re.compile("^I[A-Z].*")
class ServiceRegistry(object):
def __init__(self):
self.__services__ = []
def _register_service(self, instance, interface):
self.__services__.append((instance, interface))
name = interface.__name__
if interface is _resolve_iface(interface) and _is_iface_name.match(name):
name = name[1:]
setattr(self, _underscore(name), instance)
def get_service_registry(registry):
if not hasattr(registry, "services"):
registry.services = ServiceRegistry()
return registry.services
def register_di_service(
config: Configurator,
service_factory,
*,
scope,
interface=Interface,
name="",
context_iface=Interface,
):
registry = config.registry
if scope == "global":
warnings.warn(
"The use of 'global' as scope parameter is deprecated. Use 'application' instead",
DeprecationWarning,
)
scope = "application"
if scope == "application":
# register only once
real_interface = _resolve_iface(interface)
if registry.queryUtility(real_interface, name=name) is None:
ob_instance = service_factory(registry=registry)
get_service_registry(registry)._register_service(ob_instance, interface)
registry.registerUtility(ob_instance, real_interface, name=name)
config.register_service(
service=ob_instance, iface=interface, context=context_iface, name=name
)
else:
warnings.warn(
f"Double registration of the same service {interface}"
f" with name {name!r} attempted"
)
else:
# noinspection PyUnusedLocal
def wrapped_factory(context, request):
return service_factory(request=request)
config.register_service_factory(
wrapped_factory, interface, context_iface, name=name
)
_NOT_SET = object()
[docs]
def service(interface=None, *, name="", context_iface=Interface, scope=_NOT_SET):
if scope not in {"global", "application", "request", _NOT_SET}:
raise ValueError(
f"Invalid scope {scope}, must be either 'application' or 'request'"
)
if scope == "global":
warnings.warn(
"The use of 'global' as scope parameter is deprecated. Use 'application' instead",
DeprecationWarning,
)
service_name = name
def service_decorator(wrapped):
# noinspection PyUnusedLocal,PyShadowingNames
def callback(scanner, name, ob):
config = scanner.config
iface = interface
if iface is None:
if service_name:
iface = Interface
else:
iface = ob
actual_scope = scope
if actual_scope is _NOT_SET:
actual_scope = getattr(ob, "__pyramid_di_scope__", None)
if actual_scope is None:
raise TypeError(f"Cannot infer service scope for {ob}")
if actual_scope not in {"application", "request"}:
raise TypeError(
f"{ob} has invalid scope {actual_scope}, must be either 'application' or 'request'"
)
config.register_di_service(
ob,
name=service_name,
interface=iface,
context_iface=context_iface,
scope=actual_scope,
)
venusian.attach(wrapped, callback, category="pyramid_di.service")
return wrapped
return service_decorator
T = TypeVar("T", bound=object)
[docs]
def autowired(interface: Type[T] = Interface, name: str = "") -> T:
def getter(self) -> T:
if hasattr(self, "request"):
# the context-specific services wouldn't work anyway, so do it this way
return self.request.find_service(interface, None, name)
return self.registry.getUtility(_resolve_iface(interface), name)
return reify_attr(getter)
[docs]
class ApplicationScopedBaseService(object):
"""
A convenience base class for application-scoped services.
"""
__pyramid_di_scope__ = "application"
registry: "pyramid.registry.Registry"
def __init__(self, *, registry: "pyramid.registry.Registry", **kw):
self.registry = registry
super(ApplicationScopedBaseService, self).__init__(**kw)
[docs]
class BaseService(ApplicationScopedBaseService):
def __init__(self, **kw):
warnings.warn(
"BaseService has been renamed to ApplicationScopedBaseService",
DeprecationWarning,
2,
)
super(BaseService, self).__init__(**kw)
def __init_subclass__(self):
warnings.warn(
"BaseService has been renamed to ApplicationScopedBaseService",
DeprecationWarning,
2,
)
[docs]
class RequestScopedBaseService(object):
"""
A convenience class for request-scoped services.
"""
__pyramid_di_scope__ = "request"
request: "pyramid.request.Request"
registry: "pyramid.registry.Registry"
context: Any
def __init__(self, *, request: "pyramid.request.Request", **kw):
self.request = request
self.context = getattr(request, "context", None)
self.registry = request.registry
super(RequestScopedBaseService, self).__init__(**kw)
def scan_services(config, *a, **kw):
kw["categories"] = ("pyramid_di.service",)
return config.scan(*a, **kw)
def includeme(config):
config.include("pyramid_services")
config.add_directive("scan_services", scan_services)
config.add_directive("register_di_service", register_di_service)
get_service_registry(config.registry)