Source code for yarp.temporal

"""
Temporal filters for :py:class:`Value` values.
"""

import asyncio

from yarp import NoValue, Event, Value, ensure_value, fn
from ._utils import make_same_type, emit_fn, on_value

__names__ = [
    "emit_at",
    "delay",
    "time_window",
    "rate_limit",
]


[docs] def emit_at(time: Value | float | int | None) -> Event: """emit an event at the times given in time Time can be None (no events), a time in seconds as given by loop.time() (emit None at the given time), or a tuple containing the time and the value to emit. Whenever time changes the timer is reloaded, so if the timer for the previous value has not fired, it never will. This also means that if the time changes to be before the current time, there will be one event per change. This is mostly useful in cases where you can calculate the next time that something should happen from some Value. The resulting Event does not depend on ``time``, because it is always emitted in an asyncio callback (never synchronously on ``time`` changes, even if ``time`` is in the past). This makes it safe to modify ``time`` (or one of its inputs) in a callback attached to the result. """ time = ensure_value(time) event = Event() timer = None timer_time = None timer_emit = None loop = asyncio.get_event_loop() def on_timer(): nonlocal timer, timer_time, timer_emit # clear first in case emitting the event causes setup_timer to be called to_emit = timer_emit timer, timer_time, timer_emit = None, None, None event.emit(to_emit) def setup_timer(value): nonlocal timer, timer_time, timer_emit if not isinstance(value, tuple): value = (value, None) new_time, new_emit = value assert new_time is None or isinstance(new_time, (float, int)) if (new_time, new_emit) != (timer_time, timer_emit): if timer is not None: timer.cancel() timer = None timer_time, timer_emit = new_time, new_emit if new_time is not None: timer = loop.call_at(new_time, on_timer) setup_timer(time.value) time.on_value_changed(setup_timer) event._keep_alive = time # XXX return event
[docs] def delay(source, delay_seconds): r""" Produce a time-delayed version of a :py:class:`Value` or :py:class:`Event`. For :py:class:`Value`\ s, the initial value is set immediately. The ``delay_seconds`` argument may be a constant or a Value giving the number of seconds to delay value changes. If it is increased, previously delayed values will be delayed further. If it is decreased, values which should already have been output will be output rapidly one after another. """ delay_seconds = ensure_value(delay_seconds) loop = asyncio.get_event_loop() values_and_times = Value([], inputs=(source, delay_seconds)) output = make_same_type(source, inputs=(delay_seconds,)) emit = emit_fn(output) @on_value(source, current=False) def _(value): values_and_times.value = values_and_times.value + [(value, loop.time())] @fn def next_time(values_and_times, delay_seconds): return values_and_times[0][1] + delay_seconds if values_and_times else None timer = emit_at(next_time(values_and_times, delay_seconds)) @timer.on_event def pop_one(_ev): emit(values_and_times.value[0][0]) values_and_times.value = values_and_times.value[1:] values_and_times.add_input(timer) output.add_input(timer) @delay_seconds.on_value_changed def on_delay_changed(new_delay): # drop and emit values that should have expired already. this is only # needed to make the values come out in the same asyncio time step vt = values_and_times.value.copy() changed = False expiry_time = loop.time() - new_delay while vt and vt[0][1] <= expiry_time: emit(vt.pop(0)[0]) changed = True if changed: values_and_times.value = vt return output
[docs] def time_window(source, duration_seconds): """Produce a moving window over the historical values of a Value or the events of an Event within a given time period. ``duration_seconds`` may be a constant or a Value giving the window duration as a number of seconds. The duration should be a number of seconds greater than zero and never be ``NoValue``. If the value is reduced, previously inserted values will be expired earlier, possibly immediately if they should already have expired. If the value is increased, previously inserted values will have an increased timeout. """ duration_seconds = ensure_value(duration_seconds) loop = asyncio.get_event_loop() # value containing a list of values and the time at which they were last # seen. the time may be None for the current value of a Value values_and_times = Value([], inputs=(source, duration_seconds)) @fn def to_values(values_and_times): return [value for value, time in values_and_times] @fn def next_time_to_pop(values_and_times, duration_seconds): if values_and_times and values_and_times[0][1] is not None: return values_and_times[0][1] + duration_seconds else: return None pop_event = emit_at(next_time_to_pop(values_and_times, duration_seconds)) values_and_times.add_input(pop_event) @pop_event.on_event def pop_one_value(_value): values_and_times.value = values_and_times.value[1:] @duration_seconds.on_value_changed def drop_expired_values(new_duration): expiry_time = loop.time() - new_duration vt = values_and_times.value.copy() while vt and vt[0][1] is not None and vt[0][1] <= expiry_time: vt.pop(0) if len(vt) < len(values_and_times.value): values_and_times.value = vt match source: case Value(): values_and_times.value = [(source.value, None)] @source.on_value_changed def _(new_value): # update the expiry time for the last value first old_value, _old_time = values_and_times.value[-1] values_and_times.value = values_and_times.value[:-1] + [ (old_value, loop.time()), (new_value, None), ] case Event(): @source.on_event def _(event): values_and_times.value = values_and_times.value + [(event, loop.time())] case _: assert False return to_values(values_and_times)
[docs] def rate_limit(source, min_interval=0.1): """Prevent changes occurring above a particular rate, dropping or postponing changes if necessary. ``source`` may be a Value or Event. The ``min_interval`` argument may be a constant or a :py:class:`Value`. If this value is decreased, currently delayed values will be output early (or immediately if the value would have been output previously). If increased, the current delay will be increased. """ loop = asyncio.get_event_loop() # the start time of the current block if one is active block_time = Value(None, inputs=(source,)) has_value = False # was there a value in the current block? next_value = None # if so, what is it? output = make_same_type(source, inputs=(source,), initial_value=NoValue) emit = emit_fn(output) @on_value(source) def _(value): nonlocal has_value, next_value if block_time.value is not None: # blocking, save until end next_value = value has_value = True else: # not blocking, emit and start blocking emit(value) block_time.value = loop.time() @fn def block_end_time(block_time, min_interval): return None if block_time is None else block_time + min_interval block_end_event = emit_at(block_end_time(block_time, min_interval)) @block_end_event.on_event def end_block(_value): nonlocal has_value, next_value if has_value: # had a value in this block, emit it and start a new block emit(next_value) has_value = False next_value = None block_time.value = loop.time() else: # no value in this block, stop blocking block_time.value = None output.add_input(block_end_event) block_time.add_input(block_end_event) return output