Source code for lz.iterating

import functools
import itertools
from collections import (OrderedDict,
                         abc,
                         deque)
from operator import is_not
from typing import (Any,
                    Hashable,
                    Iterable,
                    MutableMapping,
                    Sequence,
                    Sized,
                    Tuple,
                    Type)

from .functional import flatmap
from .hints import (Domain,
                    Map,
                    Operator,
                    Range)


[docs]@functools.singledispatch def capacity(iterable: Iterable[Any]) -> int: """ Returns number of elements in iterable. >>> capacity(range(0)) 0 >>> capacity(range(10)) 10 """ counter = itertools.count() # order matters: if `counter` goes first, # then it will be incremented even for empty `iterable` deque(zip(iterable, counter), maxlen=0) return next(counter)
@capacity.register(abc.Sized) def _(iterable: Sized) -> int: """ Returns number of elements in sized iterable. """ return len(iterable)
[docs]def first(iterable: Iterable[Domain]) -> Domain: """ Returns first element of iterable. >>> first(range(10)) 0 """ try: return next(iter(iterable)) except StopIteration as error: raise ValueError('Argument supposed to be non-empty.') from error
[docs]def last(iterable: Iterable[Domain]) -> Domain: """ Returns last element of iterable. >>> last(range(10)) 9 """ try: return deque(iterable, maxlen=1)[0] except IndexError as error: raise ValueError('Argument supposed to be non-empty.') from error
[docs]def cut(iterable: Iterable[Domain], *, slice_: slice) -> Iterable[Domain]: """ Selects elements from iterable based on given slice. Slice fields supposed to be unset or non-negative since it is hard to evaluate negative indices/step for arbitrary iterable which may be potentially infinite or change previous elements if iterating made backwards. """ yield from itertools.islice(iterable, slice_.start, slice_.stop, slice_.step)
[docs]def cutter(slice_: slice) -> Operator[Iterable[Domain]]: """ Returns function that selects elements from iterable based on given slice. >>> to_first_triplet = cutter(slice(3)) >>> list(to_first_triplet(range(10))) [0, 1, 2] >>> to_second_triplet = cutter(slice(3, 6)) >>> list(to_second_triplet(range(10))) [3, 4, 5] >>> cut_out_every_third = cutter(slice(0, None, 3)) >>> list(cut_out_every_third(range(10))) [0, 3, 6, 9] """ result = functools.partial(cut, slice_=slice_) result.__doc__ = ('Selects elements from iterable {slice}.' .format(slice=_slice_to_description(slice_))) return result
def _slice_to_description(slice_: slice) -> str: """Generates human readable representation of `slice` object.""" slice_description_parts = [] start_is_specified = bool(slice_.start) if start_is_specified: slice_description_parts.append('starting from position {start}' .format(start=slice_.start)) step_is_specified = slice_.step is not None if step_is_specified: slice_description_parts.append('with step {step}' .format(step=slice_.step)) if slice_.stop is not None: stop_description_part = ('stopping at position {stop}' .format(stop=slice_.stop)) if start_is_specified or step_is_specified: stop_description_part = 'and ' + stop_description_part slice_description_parts.append(stop_description_part) return ' '.join(slice_description_parts)
[docs]def chopper(size: int) -> Map[Iterable[Domain], Iterable[Sequence[Domain]]]: """ Returns function that splits iterable into chunks of given size. >>> in_three = chopper(3) >>> list(map(tuple, in_three(range(10)))) [(0, 1, 2), (3, 4, 5), (6, 7, 8), (9,)] """ result = functools.partial(chop, size=size) result.__doc__ = ('Splits iterable into chunks of size {size}.\n' .format(size=size)) return result
[docs]@functools.singledispatch def chop(iterable: Iterable[Domain], *, size: int) -> Iterable[Sequence[Domain]]: """ Splits iterable into chunks of given size. """ iterator = iter(iterable) yield from iter(lambda: tuple(itertools.islice(iterator, size)), ())
@chop.register(abc.Sequence) def _(iterable: Sequence[Domain], *, size: int) -> Iterable[Sequence[Domain]]: """ Splits sequence into chunks of given size. """ if not size: return for start in range(0, len(iterable), size): yield iterable[start:start + size] # deque do not support slice notation chop.register(deque, chop.registry[object]) in_two = chopper(2) in_three = chopper(3) in_four = chopper(4)
[docs]def slide(iterable: Iterable[Domain], *, size: int) -> Iterable[Tuple[Domain, ...]]: """ Slides over iterable with window of given size. """ iterator = iter(iterable) initial = tuple(itertools.islice(iterator, size)) def shift(previous: Tuple[Domain, ...], element: Domain) -> Tuple[Domain, ...]: return previous[1:] + (element,) yield from itertools.accumulate(itertools.chain([initial], iterator), shift)
[docs]def slider(size: int) -> Map[Iterable[Domain], Iterable[Tuple[Domain, ...]]]: """ Returns function that slides over iterable with window of given size. >>> pairwise = slider(2) >>> list(pairwise(range(10))) [(0, 1), (1, 2), (2, 3), (3, 4), (4, 5), (5, 6), (6, 7), (7, 8), (8, 9)] """ result = functools.partial(slide, size=size) result.__doc__ = ('Slides over iterable with window of size {size}.' .format(size=size)) return result
pairwise = slider(2) triplewise = slider(3) quadruplewise = slider(4)
[docs]@functools.singledispatch def trail(iterable: Iterable[Domain], *, size: int) -> Iterable[Domain]: """ Selects elements from the end of iterable. Resulted iterable will have size not greater than given one. """ return deque(iterable, maxlen=size)
@trail.register(abc.Sequence) def _(iterable: Sequence[Domain], *, size: int) -> Sequence[Domain]: """ Selects elements from the end of sequence. Resulted sequence will have size not greater than given one. """ return iterable[-size:] if size else iterable[:size] # deque do not support slice notation trail.register(deque, trail.registry[object])
[docs]def trailer(size: int) -> Operator[Iterable[Domain]]: """ Returns function that selects elements from the end of iterable. Resulted iterable will have size not greater than given one. >>> to_last_pair = trailer(2) >>> list(to_last_pair(range(10))) [8, 9] """ result = functools.partial(trail, size=size) result.__doc__ = ('Selects {size} elements from the end of iterable.' .format(size=size)) return result
[docs]def mapper(map_: Map) -> Map[Iterable[Domain], Iterable[Range]]: """ Returns function that applies given map to the each element of iterable. >>> to_str = mapper(str) >>> list(to_str(range(10))) ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9'] """ return functools.partial(map, map_)
[docs]def flatmapper(map_: Map[Domain, Iterable[Range]] ) -> Map[Iterable[Domain], Iterable[Range]]: """ Returns function that applies map to the each element of iterable and flattens results. >>> relay = flatmapper(range) >>> list(relay(range(5))) [0, 0, 1, 0, 1, 2, 0, 1, 2, 3] """ return functools.partial(flatmap, map_)
Group = Tuple[Hashable, Iterable[Domain]]
[docs]def group_by(iterable: Iterable[Domain], *, key: Map[Domain, Hashable], mapping_cls: Type[MutableMapping]) -> Iterable[Group]: """ Groups iterable elements based on given key. """ groups = mapping_cls() for element in iterable: groups.setdefault(key(element), []).append(element) yield from groups.items()
[docs]def grouper(key: Map[Domain, Hashable], *, mapping_cls: Type[MutableMapping] = OrderedDict ) -> Map[Iterable[Domain], Iterable[Group]]: """ Returns function that groups iterable elements based on given key. >>> group_by_absolute_value = grouper(abs) >>> list(group_by_absolute_value(range(-5, 5))) [(5, [-5]), (4, [-4, 4]), (3, [-3, 3]), (2, [-2, 2]), (1, [-1, 1]), (0, [0])] >>> def modulo_two(number: int) -> int: ... return number % 2 >>> group_by_evenness = grouper(modulo_two) >>> list(group_by_evenness(range(10))) [(0, [0, 2, 4, 6, 8]), (1, [1, 3, 5, 7, 9])] """ return functools.partial(group_by, key=key, mapping_cls=mapping_cls)
[docs]def expand(object_: Domain) -> Iterable[Domain]: """ Wraps object into iterable. >>> list(expand(0)) [0] """ yield object_
[docs]def flatten(iterable: Iterable[Iterable[Domain]]) -> Iterable[Domain]: """ Returns plain iterable from iterable of iterables. >>> list(flatten([range(5), range(10, 20)])) [0, 1, 2, 3, 4, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19] """ yield from itertools.chain.from_iterable(iterable)
[docs]def interleave(iterable: Iterable[Iterable[Domain]]) -> Iterable[Domain]: """ Interleaves elements from given iterable of iterables. >>> list(interleave([range(5), range(10, 20)])) [0, 10, 1, 11, 2, 12, 3, 13, 4, 14, 15, 16, 17, 18, 19] """ iterators = itertools.cycle(map(iter, iterable)) while True: try: for iterator in iterators: yield next(iterator) except StopIteration: is_not_exhausted = functools.partial(is_not, iterator) iterators = itertools.cycle(itertools.takewhile(is_not_exhausted, iterators)) else: return