Source code for lz.functional

import ast
import functools
import inspect
import itertools
import sys
from collections import abc
from types import (MappingProxyType,
                   MethodType)
from typing import (Any,
                    Callable,
                    Dict,
                    Iterable,
                    Optional,
                    Tuple,
                    Type,
                    Union)

from paradigm import signatures
from reprit import seekers
from reprit.base import generate_repr

from .hints import (Domain,
                    Map,
                    Range)


[docs]def identity(argument: Domain) -> Domain: """ Returns object itself. >>> identity(0) 0 """ return argument
[docs]def compose(last_function: Map[Any, Range], *front_functions: Callable[..., Any]) -> Callable[..., Range]: """ Returns functions composition. >>> sum_of_first_n_natural_numbers = compose(sum, range) >>> sum_of_first_n_natural_numbers(10) 45 """ caller_frame_info = inspect.stack()[1] return Composition(last_function, *front_functions, file_path=caller_frame_info.filename, line_number=caller_frame_info.lineno, line_offset=0)
class Composition: def __new__(cls, *functions: Callable[..., Any], **kwargs: Any) -> Union['Composition', Callable[..., Range]]: if len(functions) == 1: return functions[0] return super().__new__(cls) def __init__(self, *functions: Callable[..., Any], file_path: Optional[str] = None, line_number: int = 0, line_offset: int = 0) -> None: def flatten(function: Callable[..., Any] ) -> Iterable[Callable[..., Any]]: if isinstance(function, type(self)): yield from function.functions else: yield function self._functions = tuple(itertools.chain .from_iterable(map(flatten, functions))) self._function = None if file_path is None: file_path = __file__ self._file_path = file_path self._line_number = line_number self._line_offset = line_offset @property def functions(self) -> Tuple[Callable[..., Any], ...]: return self._functions @property def function(self) -> Callable[..., Range]: if self._function is None: self._function = _compose(*self.functions, function_name='composition', file_path=self._file_path, line_number=self._line_number, line_offset=self._line_offset) return self._function def __call__(self, *args: Domain, **kwargs: Domain) -> Range: return self.function(*args, **kwargs) def __get__(self, instance: Domain, owner: Type[Domain]) -> Callable[..., Range]: return MethodType(self.function, instance) __repr__ = generate_repr(__init__, field_seeker=seekers.complex_) @signatures.factory.register(Composition) def _(object_: Composition) -> signatures.Base: return signatures.factory(object_.functions[-1]) def _compose(*functions: Callable[..., Any], function_name: str, arguments_factory: Callable[..., ast.arguments] = ast.arguments if sys.version_info < (3, 8) # Python3.8 adds positional-only arguments else functools.partial(ast.arguments, []), module_factory: Callable[..., ast.Module] = ast.Module if sys.version_info < (3, 8) # Python3.8 adds `type_ignores` parameter else functools.partial(ast.Module, type_ignores=[]), file_path: str, line_number: int, line_offset: int) -> Callable[..., Range]: def function_to_unique_name(function: Callable) -> str: # we are not using `__name__`/`__qualname__` attributes # due to their potential non-uniqueness return '_' + str(id(function)).replace('-', '_') functions_names = list(map(function_to_unique_name, functions)) set_attributes = functools.partial(functools.partial, lineno=line_number, col_offset=line_offset) variadic_positionals_name = 'args' variadic_keywords_name = 'kwargs' def to_next_call_node(node: ast.Call, name: str) -> ast.Call: return set_attributes(ast.Call)(to_name_node(name), [node], []) def to_name_node(name: str, *, context_factory: Type[ast.expr_context] = ast.Load ) -> ast.Name: return set_attributes(ast.Name)(name, context_factory()) reversed_functions_names = reversed(functions_names) calls_node = set_attributes(ast.Call)( to_name_node(next(reversed_functions_names)), [set_attributes(ast.Starred)( to_name_node(variadic_positionals_name), ast.Load())], [ast.keyword(None, to_name_node(variadic_keywords_name))]) calls_node = functools.reduce(to_next_call_node, reversed_functions_names, calls_node) function_definition_node = set_attributes(ast.FunctionDef)( function_name, arguments_factory( [], set_attributes(ast.arg)(variadic_positionals_name, None), [], [], set_attributes(ast.arg)(variadic_keywords_name, None), []), [set_attributes(ast.Return)(calls_node)], [], None) tree = module_factory([function_definition_node]) code = compile(tree, file_path, 'exec') namespace = dict(zip(functions_names, functions)) exec(code, namespace) return namespace[function_name]
[docs]def combine(*maps: Map) -> Map[Iterable[Domain], Iterable[Range]]: """ Returns function that applies each map to corresponding argument. >>> encoder_decoder = combine(str.encode, bytes.decode) >>> list(encoder_decoder(['hello', b'world'])) [b'hello', 'world'] """ return Combination(*maps)
class Combination: def __init__(self, *maps: Map) -> None: self.maps = maps def __call__(self, arguments: Iterable[Domain]) -> Iterable[Range]: yield from (map_(argument) for map_, argument in zip(self.maps, arguments)) __repr__ = generate_repr(__init__) @signatures.factory.register(Combination) def _(object_: Combination) -> signatures.Base: return signatures.factory(object_.__call__) class ApplierBase(abc.Callable): def __init__(self, function: Callable[..., Range], *args: Domain, **kwargs: Domain) -> None: if isinstance(function, type(self)): args = function.args + args kwargs = {**function.keywords, **kwargs} function = function.func self._function = function self._args = args self._kwargs = kwargs @property def func(self) -> Callable[..., Range]: return self._function @property def args(self) -> Tuple[Domain, ...]: return self._args @property def keywords(self) -> Dict[str, Domain]: return self._kwargs ApplierBase.register(functools.partial) class Curry(ApplierBase): def __init__(self, function: Callable[..., Range], signature: signatures.Base, *args: Domain, **kwargs: Domain) -> None: super().__init__(function, *args, **kwargs) self.signature = signature def __call__(self, *args: Domain, **kwargs: Domain ) -> Union['Curry', Range]: args = self.args + args kwargs = {**self.keywords, **kwargs} try: return self.func(*args, **kwargs) except TypeError: if (not self.signature.expects(*args, **kwargs) or self.signature.all_set(*args, **kwargs)): raise return type(self)(self.func, self.signature, *args, **kwargs) __repr__ = generate_repr(__init__, field_seeker=seekers.complex_) @signatures.factory.register(Curry) def _(object_: Curry) -> signatures.Base: return object_.signature
[docs]def curry(function: Callable[..., Range], *, signature: Optional[signatures.Base] = None) -> Curry: """ Returns curried version of given function. >>> curried_pow = curry(pow) >>> two_to_power = curried_pow(2) >>> two_to_power(10) 1024 """ if signature is None: signature = signatures.factory(function) return Curry(function, signature)
[docs]def pack(function: Callable[..., Range]) -> Map[Iterable[Domain], Range]: """ Returns function that works with single iterable parameter by unpacking elements to given function. >>> packed_int = pack(int) >>> packed_int(['10']) 10 >>> packed_int(['10'], {'base': 2}) 2 """ return functools.partial(apply, function)
[docs]def apply(function: Callable[..., Range], args: Iterable[Domain], kwargs: Dict[str, Any] = MappingProxyType({})) -> Range: """ Calls given function with given positional and keyword arguments. """ return function(*args, **kwargs)
[docs]def to_constant(object_: Domain) -> Callable[..., Domain]: """ Returns function that always returns given object. >>> always_zero = to_constant(0) >>> always_zero() 0 >>> always_zero(1) 0 >>> always_zero(how_about=2) 0 """ return Constant(object_)
class Constant: def __init__(self, object_: Domain) -> None: self.object_ = object_ def __call__(self, *args: Any, **kwargs: Any) -> Domain: return self.object_ __repr__ = generate_repr(__init__) @signatures.factory.register(Constant) def _(object_: Constant) -> signatures.Base: return signatures.factory(object_.__call__)
[docs]def flip(function: Callable[..., Range]) -> Callable[..., Range]: """ Returns function with positional arguments flipped. >>> flipped_power = flip(pow) >>> flipped_power(2, 4) 16 """ return functools.partial(call_flipped, function)
def call_flipped(function: Callable[..., Range], *args: Domain, **kwargs: Domain) -> Range: """ Calls given function with positional arguments flipped. """ return function(*args[::-1], **kwargs)
[docs]def cleave(*functions: Callable[..., Range]) -> Callable[..., Iterable[Range]]: """ Returns function that separately applies given functions to the same arguments. >>> to_min_and_max = cleave(min, max) >>> list(to_min_and_max(range(10))) [0, 9] >>> list(to_min_and_max(range(0), default=None)) [None, None] """ return Cleavage(*functions)
class Cleavage: def __init__(self, *functions: Map) -> None: self.functions = functions def __call__(self, *args: Domain, **kwargs: Domain) -> Iterable[Range]: yield from (function(*args, **kwargs) for function in self.functions) __repr__ = generate_repr(__init__) @signatures.factory.register(Cleavage) def _(object_: Cleavage) -> signatures.Base: return signatures.factory(object_.__call__)
[docs]def flatmap(function: Map[Domain, Iterable[Range]], *iterables: Iterable[Domain]) -> Iterable[Range]: """ Applies given function to the arguments aggregated from given iterables and concatenates results into plain iterable. >>> list(flatmap(range, range(5))) [0, 0, 1, 0, 1, 2, 0, 1, 2, 3] """ yield from itertools.chain.from_iterable(map(function, *iterables))