Source code for svarog.forges

import inspect
from functools import lru_cache
from typing import Any
from typing import ForwardRef
from typing import List
from typing import Mapping
from typing import Optional
from typing import Type
from typing import TypeVar
from typing import Union

from .checks import is_bare
from .compat import get_args
from .tools import camel_to_snake
from .types import CannotDispatch
from .types import Forge
from .types import NoneType


T = TypeVar("T")


@lru_cache()
def _cached_defaults(thingy):
    parameters = inspect.signature(thingy).parameters
    return {
        name: parameter.default
        for name, parameter in parameters.items()
        if parameter.default != inspect._empty
    }


def _clean_annotations(annotated):
    if annotated.__annotations__ == {}:
        for varname in annotated.__code__.co_varnames:
            if varname != "self":
                yield varname, Any
    else:
        for name, value in annotated.__annotations__.items():
            if isinstance(value, str):
                yield name, ForwardRef(value)
            else:
                yield name, value


[docs]def forge_none(type_: NoneType, data: Any, forge: Forge) -> None: return None
[docs]def forge_annotated_init(type_: Type[T], data: Any, forge: Forge) -> T: if not hasattr(data, "items"): raise CannotDispatch() def forge_or_raise(value, data): try: return forge(value, data) except TypeError as e: raise ValueError(f"Cannot forge {value} from {data}. {e}") forged = { name: forge_or_raise(value, data[name]) for name, value in _clean_annotations(type_.__init__) if name != "return" and name in data } kwargs = dict(_cached_defaults(type_.__init__)) kwargs.update(forged) return type_(**kwargs)
[docs]def forge_union(union: Union, data: Any, forge: Forge) -> Optional[T]: params = union.__args__ # type: ignore if NoneType in params: if data is None: return None if len(params) == 2: first, second = params other = first if second is NoneType else second return forge(other, data) raise NotImplementedError("Unions other as optionals are not supported yet")
[docs]def forge_list(type_: Type[List], list_: List, forge: Forge) -> List: if is_bare(type_): return list(list_) element_type = get_args(type_)[0] if element_type is Any: return list(list_) return [forge(element_type, element) for element in list_]
[docs]def forge_mapping(type_: Type[Mapping], mapping: Mapping, forge: Forge) -> Mapping: if is_bare(type_): return dict(mapping) key_arg, value_arg = type_.__args__ # type: ignore return {forge(key_arg, k): forge(value_arg, v) for k, v in mapping.items()}
[docs]def forge_literal(_: Any, literal: str, __: Any) -> str: return str(literal)
[docs]def filter_cammel_case(_: Any, data: Mapping[str, Any]) -> Mapping[str, Any]: try: return {camel_to_snake(k): v for k, v in data.items()} except AttributeError: return data