Source code for traces.timeseries

"""A class for manipulating time series based on measurements at
unevenly-spaced times, see:

http://en.wikipedia.org/wiki/Unevenly_spaced_time_series

"""

import contextlib
import csv
import datetime
import itertools
import warnings

from . import histogram, infinity, operations, plot, utils
from .sorted_dict import SortedDict

NotGiven = object()


[docs] class TimeSeries: """A class to help manipulate and analyze time series that are the result of taking measurements at irregular points in time. For example, here would be a simple time series that starts at 8am and goes to 9:59am: >>> ts = TimeSeries() >>> ts['8:00am'] = 0 >>> ts['8:47am'] = 1 >>> ts['8:51am'] = 0 >>> ts['9:15am'] = 1 >>> ts['9:59am'] = 0 The value of the time series is the last recorded measurement: for example, at 8:05am the value is 0 and at 8:48am the value is 1. So: >>> ts['8:05am'] 0 >>> ts['8:48am'] 1 There are also a bunch of things for operating on another time series: sums, difference, logical operators and such. """ def __init__(self, data=None, default=None): self._d = SortedDict(data) self.default = default def __getstate__(self): return { "data": self.items(), "default": self.default, } def __setstate__(self, state): self.__init__(**state) def __iter__(self): """Iterate over sorted (time, value) pairs.""" return iter(self.items()) def __bool__(self): return bool(self._d) def is_empty(self): return len(self) == 0 @staticmethod def linear_interpolate(v0, v1, t): return v0 + t * (v1 - v0) @staticmethod def scaled_time(t0, t1, time): return (time - t0) / (t1 - t0) def _get_linear_interpolate(self, time): right_index = self._d.bisect_right(time) left_index = right_index - 1 if left_index < 0: # before first measurement return self.default elif right_index == len(self._d): # after last measurement return self.last_value() else: left_time, left_value = self._d.peekitem(left_index) right_time, right_value = self._d.peekitem(right_index) t = self.scaled_time(left_time, right_time, time) return self.linear_interpolate(left_value, right_value, t) def _get_previous(self, time): """Get the value at the latest measurement time before 'time'. Args: time: The time at which to get the value Returns: The value at the most recent measurement time before 'time', or the default value if there are no measurements before 'time' """ right_index = self._d.bisect_right(time) left_index = right_index - 1 if right_index > 0: # There is at least one measurement at or before 'time' _, left_value = self._d.peekitem(left_index) return left_value else: # No measurements at or before 'time' return self.default
[docs] def get(self, time, interpolate="previous"): """Get the value of the time series at any time point. This method retrieves the value at any time point, even between actual measurement times. The interpolation method determines how values between measurements are calculated. Args: time: The time at which to get the value interpolate (str): The interpolation method to use. Available options: - "previous": Use the value from the most recent measurement time (step function / zero-order hold) - "linear": Use linear interpolation between adjacent measurements Returns: The interpolated value at the specified time Raises: ValueError: If an invalid interpolation method is specified Examples: >>> ts = TimeSeries() >>> ts[0] = 0 >>> ts[10] = 10 >>> >>> # Previous value interpolation (default) >>> ts.get(5) # Returns 0 >>> >>> # Linear interpolation >>> ts.get(5, interpolate="linear") # Returns 5 """ if interpolate == "previous": return self._get_previous(time) elif interpolate == "linear": return self._get_linear_interpolate(time) else: msg = ( f"unknown value '{interpolate}' for interpolate, " f"valid values are in [previous, linear]" ) raise ValueError(msg)
[docs] def get_item_by_index(self, index): """Get the (t, value) pair of the time series by index.""" return self._d.peekitem(index)
[docs] def last_item(self): """Returns the last (time, value) pair of the time series.""" return self.get_item_by_index(-1)
[docs] def last_key(self): """Returns the last time recorded in the time series""" return self.last_item()[0]
[docs] def last_value(self): """Returns the last recorded value in the time series""" return self.last_item()[1]
[docs] def first_item(self): """Returns the first (time, value) pair of the time series.""" return self.get_item_by_index(0)
[docs] def first_key(self): """Returns the first time recorded in the time series""" return self.first_item()[0]
[docs] def first_value(self): """Returns the first recorded value in the time series""" return self.first_item()[1]
[docs] def set(self, time, value, compact=False): """Set the value for the time series. If compact is True, only set the value if it's different from what it would be anyway. """ if ( (len(self) == 0) or (not compact) or (compact and self.get(time) != value) ): self._d[time] = value
[docs] def set_many(self, data, compact=False): """Set many values at once from an iterable of (time, value) pairs or a dictionary mapping times to values. This is more efficient than calling set() in a loop because it avoids per-element bisect.insort calls. Args: data: An iterable of (time, value) pairs or a dictionary. compact: If True, discard consecutive entries with the same value. The first entry is kept if it differs from the current default. Only meaningful when data is in time-sorted order. """ if not compact: self._d.update(data) return if isinstance(data, dict): data = data.items() previous_value = self.default filtered = [] for t, v in data: if v != previous_value: filtered.append((t, v)) previous_value = v self._d.update(filtered)
[docs] def set_interval(self, start, end, value, compact=False): """Sets the value for the time series within a specified time interval. Args: start: The start time of the interval, inclusive end: The end time of the interval, exclusive. value: The value to set within the interval. compact (optional): If compact is True, only set the value if it's different from what it would be anyway. Defaults to False. Raises: ValueError: If the start time is equal or after the end time, indicating an invalid interval. Example: >>> ts = TimeSeries(data=[(1, 5), (3, 2), (5, 4), (6, 1)]) >>> ts.set_interval(2, 6, 3) >>> ts TimeSeries({1: 5, 2: 3, 6: 1}) Note: The method sets the value over the interval by removing measurements points from the time series between start and end (exclusive), rather than changing the value of any intermediate points to equal the value. """ if start >= end: msg = f"start must be less than end, got start={start!r} and end={end!r}" raise ValueError(msg) end_value = self[end] # delete all intermediate items between start and end if hasattr(self._d, "delete_range"): self._d.delete_range(start, end, inclusive=(False, False)) else: for t in list(self._d.irange(start, end, inclusive=(False, False))): del self._d[t] self.set(start, value, compact) self.set(end, end_value, compact)
[docs] def compact(self): """Convert this instance to a "compact" version: the value will be the same at all times, but repeated measurements are discarded. Compacting the time series can significantly reduce the length and memory usage for data with many repeated values. No arguments are required for this method, and it modifies the time series in place. Example: >>> ts = TimeSeries(data=[(1, 5), (2, 5), (5, 5), (6, 1)]) >>> ts.compact() >>> ts TimeSeries({1: 5, 6: 1}) """ previous_value = object() redundant = [] for time, value in self: if value == previous_value: redundant.append(time) previous_value = value for time in redundant: del self[time]
[docs] def items(self): """ts.items() -> list of the (key, value) pairs in ts, as 2-tuples""" return self._d.items()
[docs] def exists(self): """Returns a new TimeSeries where values are True when the original value is not None. Deprecated: Use `is_not_none()` instead, which has a clearer name. Returns: TimeSeries: A new TimeSeries with boolean values (True where original values are not None, False where they are None) Examples: >>> ts = TimeSeries() >>> ts[0] = "data" >>> ts[1] = None >>> ts[2] = 42 >>> exists_ts = ts.exists() >>> exists_ts[0] # Returns True >>> exists_ts[1] # Returns False >>> exists_ts[2] # Returns True """ warnings.warn( "The 'exists' method is deprecated. Use 'is_not_none' instead.", DeprecationWarning, stacklevel=2, ) return self.is_not_none()
[docs] def is_not_none(self): """Returns a new TimeSeries where values are True when the original value is not None. This method checks for None values in the TimeSeries and creates a new TimeSeries with boolean values indicating presence (not None) or absence (None) of values. Returns: TimeSeries: A new TimeSeries with boolean values (True where original values are not None, False where they are None) Examples: >>> ts = TimeSeries() >>> ts[0] = "data" >>> ts[1] = None >>> ts[2] = 42 >>> exists_ts = ts.is_not_none() >>> exists_ts[0] # Returns True >>> exists_ts[1] # Returns False >>> exists_ts[2] # Returns True """ result = TimeSeries(default=self.default is not None) for t, v in self: result[t] = v is not None return result
[docs] def remove(self, time): """Allow removal of measurements from the time series. This throws an error if the given time is not actually a measurement point. """ try: del self._d[time] except KeyError as error: msg = f"no measurement at {time}" raise KeyError(msg) from error
[docs] def remove_points_from_interval(self, start, end): """Remove all measurement points within a specified time interval. This method removes all measurement points that fall within the interval [start, end), not including the end point. Unlike `remove()`, this method won't raise KeyError if there are no points in the interval. Args: start: The start time of the interval (inclusive) end: The end time of the interval (exclusive) Examples: >>> ts = TimeSeries() >>> ts[0] = 0 >>> ts[5] = 5 >>> ts[10] = 10 >>> ts.remove_points_from_interval(4, 7) >>> # Now ts contains only points at t=0 and t=10 """ for s, _e, _v in self.iterperiods(start, end): with contextlib.suppress(KeyError): del self._d[s]
[docs] def n_measurements(self): """Return the number of measurements in the time series.""" return len(self._d)
def __len__(self): """Number of points in the TimeSeries.""" return self.n_measurements() def __repr__(self): """A detailed string representation for debugging. Returns: str: A string representation showing the class name, default value, and all time-value pairs """ def format_item(item): return "{!r}: {!r}".format(*item) items = dict(self._d.items()) return f"{type(self).__name__}(default={self.default!r}, {items!r})" def __str__(self): """A human-readable string representation (truncated if it gets too long). Returns a string showing the class name, default value, and time-value pairs. If there are more than MAX_LENGTH items, the middle section is truncated. Returns: str: A formatted string representation of the TimeSeries """ def format_item(item): return "{!s}: {!s}".format(*item) MAX_LENGTH = 20 half = MAX_LENGTH // 2 # If we have too many items, truncate the middle section if len(self) > MAX_LENGTH: first_part = ", ".join( format_item(_) for _ in self._d.items()[:half] ) middle_part = ", ".join( format_item(_) for _ in self._d.items()[half:-half] ) last_part = ", ".join( format_item(_) for _ in self._d.items()[-half:] ) truncate_string = f"<...{len(self) - MAX_LENGTH} items...>" if len(truncate_string) < len(middle_part): middle_part = truncate_string items = ", ".join([first_part, middle_part, last_part]) else: items = ", ".join(format_item(_) for _ in self._d.items()) return f"{type(self).__name__}(default={self.default!r}, {{{items}}})"
[docs] def iterintervals(self, n=2): """Iterate over groups of `n` consecutive measurement points in the time series. """ # tee the original iterator into n identical iterators streams = itertools.tee(iter(self), n) # advance the "cursor" on each iterator by an increasing # offset, e.g. if n=3: # # [a, b, c, d, e, f, ..., w, x, y, z] # first cursor --> * # second cursor --> * # third cursor --> * for stream_index, stream in enumerate(streams): for _ in range(stream_index): next(stream) # now, zip the offset streams back together to yield tuples, # in the n=3 example it would yield: # (a, b, c), (b, c, d), ..., (w, x, y), (x, y, z) yield from zip(*streams, strict=False)
@staticmethod def _value_function(value): # todo: should this be able to take NotGiven, so that it would # be possible to filter for None explicitly? # if value is None, don't filter if value is None: def value_function(t0_, t1_, value_): return True # if value is a function, use the function to filter elif callable(value): value_function = value # if value is a constant other than None, then filter to # return only the intervals where the value equals the # constant else: def value_function(t0_, t1_, value_): return value_ == value return value_function
[docs] def iterperiods(self, start=None, end=None, value=None): """This iterates over the periods (optionally, within a given time span) and yields (interval start, interval end, value) tuples. """ # todo: add mask argument here. # todo: check whether this can be simplified with newer SortedDict start, end, mask = self._check_boundaries( start, end, allow_infinite=False ) value_function = self._value_function(value) # get start index and value start_index = self._d.bisect_right(start) if start_index: _, start_value = self._d.peekitem(start_index - 1) else: start_value = self.default # get last index before end of time span end_index = self._d.bisect_right(end) interval_t0, interval_value = start, start_value for interval_t1 in self._d.islice(start_index, end_index): if value_function(interval_t0, interval_t1, interval_value): yield interval_t0, interval_t1, interval_value # set start point to the end of this interval for next # iteration interval_t0 = interval_t1 interval_value = self[interval_t0] # yield the time, duration, and value of the final period if interval_t0 < end and value_function( interval_t0, end, interval_value ): yield interval_t0, end, interval_value
[docs] def slice(self, start, end): """Return an equivalent TimeSeries that only has points between `start` and `end` (always starting at `start`) """ start, end, mask = self._check_boundaries( start, end, allow_infinite=True ) result = TimeSeries(default=self.default) for t0, t1, value in self.iterperiods(start, end): # noqa: B007 result[t0] = value result[t1] = self[t1] return result
def _check_regularization(self, start, end, sampling_period=None): # only do these checks if sampling period is given if sampling_period is not None: # cast to both seconds and timedelta for error checking if isinstance(sampling_period, datetime.timedelta): sampling_period_seconds = sampling_period.total_seconds() sampling_period_timedelta = sampling_period else: sampling_period_seconds = sampling_period sampling_period_timedelta = datetime.timedelta( seconds=sampling_period ) if sampling_period_seconds <= 0: msg = "sampling_period must be > 0" raise ValueError(msg) if sampling_period_seconds > utils.duration_to_number(end - start): msg = ( "sampling_period " "is greater than the duration between " "start and end." ) raise ValueError(msg) sampling_period = ( sampling_period_timedelta if isinstance(start, datetime.date) else sampling_period_seconds ) return sampling_period
[docs] def sample( self, sampling_period, start=None, end=None, interpolate="previous", mask=None, ): """Sampling at regular time periods.""" start, end, mask = self._check_boundaries(start, end) sampling_period = self._check_regularization( start, end, sampling_period ) result = [] for start, end, _ in mask.iterperiods(value=True): current_time = start while current_time <= end: value = self.get(current_time, interpolate=interpolate) result.append((current_time, value)) current_time += sampling_period return result
[docs] def sample_interval( # noqa: C901 self, sampling_period=None, start=None, end=None, idx=None, operation="mean", ): """Sampling on intervals by using some operation (mean,max,min). It can be called either with sampling_period, [start], [end] or with a idx as a DateTimeIndex. The returing pandas.Series will be indexed either on pandas.date_range(start,end,freq=sampling_period) or on idx. :param sampling_period: the sampling period :param start: the start time of the sampling :param end: the end time of the sampling :param idx: a DateTimeIndex with the start times of the intervals :param operation: "mean", "max" or "min" :return: a pandas Series with the Trace sampled """ try: import pandas as pd except ImportError as error: msg = "sample_interval need pandas to be installed" raise ImportError(msg) from error if idx is None: start, end, mask = self._check_boundaries(start, end) sampling_period = self._check_regularization( start, end, sampling_period ) # create index on [start, end) idx = pd.date_range( start, end, freq=sampling_period, inclusive="both" ) else: start, end, mask = self._check_boundaries(idx[0], idx[-1]) idx_list = idx.values # list(idx) # create all inflexion points def items_in_horizon(): # yields all items between start and end as well as start and end yield (start, self[start]) for t, v in self.items(): if t <= start: continue if t >= end: break yield t, v yield (end, self[end]) inflexion_times, inflexion_values = zip( *items_in_horizon(), strict=False ) inflexion_times = pd.DatetimeIndex(inflexion_times) # identify all inflexion intervals inflexion_intervals = idx.get_indexer(inflexion_times, method="ffill") # convert DatetimeIndex to numpy array for faster indexation inflexion_times = inflexion_times.values Np1 = len(idx_list) - 1 # convert to timestamp # (to make interval arithmetic faster, no need for total_seconds) inflexion_times = inflexion_times.astype("int64") idx_times = idx.astype("int64") # initialise init, update and finish functions depending # on the aggregation operator init, update, finish = { "mean": ( lambda t, v: 0.0, lambda agg, t0, t1, v: agg + (t1 - t0) * v, lambda agg, t_start, t_end: agg / (t_end - t_start), ), "max": ( lambda t, v: v, lambda agg, t0, t1, v: max(agg, v), lambda agg, t_start, t_end: agg, ), "min": ( lambda t, v: v, lambda agg, t0, t1, v: min(agg, v), lambda agg, t_start, t_end: agg, ), }[operation] # initialise first interval t_start, t_end = idx_times[0:2] i0, t0, v0 = 0, t_start, self[start] agg = init(t0, v0) result = [] for i1, t1, v1 in zip( inflexion_intervals, inflexion_times, inflexion_values, strict=False ): if i0 != i1: # change of interval # finish previous interval agg = update(agg, t0, t_end, v0) agg = finish(agg, t_start, t_end) result.append((idx_list[i0], agg)) # handle all intervals between t_end and t1 if i1 != i0 + 1: result.append((idx_list[i0 + 1], v0)) # if last_point, break if i1 == Np1: break # set up new interval t_start, t_end = idx_times[i1 : i1 + 2] i0, t0 = i1, t_start agg = init(t0, v0) agg = update(agg, t0, t1, v0) i0, t0, v0 = i1, t1, v1 df = pd.DataFrame.from_records(result) return df.set_index(0).iloc[:, 0].reindex(idx[:-1]).ffill()
[docs] def moving_average( # noqa: C901 self, sampling_period, window_size=None, start=None, end=None, placement="center", pandas=False, ): """Averaging over regular intervals""" start, end, mask = self._check_boundaries(start, end) # default to sampling_period if not given if window_size is None: window_size = sampling_period sampling_period = self._check_regularization( start, end, sampling_period ) # convert to datetime if the times are datetimes full_window = window_size * 1.0 half_window = full_window / 2 if isinstance(start, datetime.date) and not isinstance( full_window, datetime.timedelta ): half_window = datetime.timedelta(seconds=half_window) full_window = datetime.timedelta(seconds=full_window) result = [] current_time = start while current_time <= end: if placement == "center": window_start = current_time - half_window window_end = current_time + half_window elif placement == "left": window_start = current_time window_end = current_time + full_window elif placement == "right": window_start = current_time - full_window window_end = current_time else: msg = f'unknown placement "{placement}"' raise ValueError(msg) # calculate mean over window and add (t, v) tuple to list try: mean = self.mean(window_start, window_end) except TypeError as e: if "NoneType" in str(e): mean = None else: raise result.append((current_time, mean)) current_time += sampling_period # convert to pandas Series if pandas=True if pandas: try: import pandas as pd except ImportError as error: msg = "can't have pandas=True if pandas is not installed" raise ImportError(msg) from error result = pd.Series( [v for t, v in result], index=[t for t, v in result], ) return result
@staticmethod def rebin(binned, key_function): result = SortedDict() for bin_start, value in binned.items(): new_bin_start = key_function(bin_start) try: result[new_bin_start] += value except KeyError: result[new_bin_start] = value return result def bin( self, unit, n_units=1, start=None, end=None, mask=None, smaller=None, transform="distribution", ): if mask is not None and mask.is_empty(): return SortedDict() if start is not None and start == end: return SortedDict() # use smaller if available if smaller: return self.rebin( smaller, lambda x: utils.datetime_floor(x, unit, n_units), ) start, end, mask = self._check_boundaries(start, end, mask=mask) start = utils.datetime_floor(start, unit=unit, n_units=n_units) function = getattr(self, transform) result = SortedDict() dt_range = utils.datetime_range(start, end, unit, n_units=n_units) for bin_start, bin_end in utils.pairwise(dt_range): result[bin_start] = function( bin_start, bin_end, mask=mask, normalized=False ) return result
[docs] def mean(self, start=None, end=None, mask=None, interpolate="previous"): """This calculated the average value of the time series over the given time range from `start` to `end`, when `mask` is truthy. """ return self.distribution( start=start, end=end, mask=mask, interpolate=interpolate ).mean()
[docs] def distribution( self, start=None, end=None, normalized=True, mask=None, interpolate="previous", ): """Calculate the distribution of values over the given time range from `start` to `end`. Args: start (orderable, optional): The lower time bound of when to calculate the distribution. By default, the first time point will be used. end (orderable, optional): The upper time bound of when to calculate the distribution. By default, the last time point will be used. normalized (bool): If True, distribution will sum to one. If False and the time values of the TimeSeries are datetimes, the units will be seconds. mask (:obj:`TimeSeries`, optional): A domain on which to calculate the distribution. interpolate (str, optional): Method for interpolating between measurement points: either "previous" (default) or "linear". Note: if "previous" is used, then the resulting histogram is exact. If "linear" is given, then the values used for the histogram are the average value for each segment -- the mean of this histogram will be exact, but higher moments (variance) will be approximate. Returns: :obj:`Histogram` with the results. """ start, end, mask = self._check_boundaries(start, end, mask=mask) counter = histogram.Histogram() for i_start, i_end, _ in mask.iterperiods(value=True): for t0, t1, _ in self.iterperiods(i_start, i_end): duration = utils.duration_to_number( t1 - t0, units="seconds", ) midpoint = utils.time_midpoint(t0, t1) value = self.get(midpoint, interpolate=interpolate) counter[value] += duration # divide by total duration if result needs to be normalized if normalized: return counter.normalized() else: return counter
[docs] def n_points( self, start=-infinity.inf, end=+infinity.inf, mask=None, include_start=True, include_end=False, normalized=False, ): """Calculate the number of points over the given time range from `start` to `end`. Args: start (orderable, optional): The lower time bound of when to calculate the distribution. By default, start is -infinity. end (orderable, optional): The upper time bound of when to calculate the distribution. By default, the end is +infinity. mask (:obj:`TimeSeries`, optional): A domain on which to calculate the distribution. Returns: `int` with the result """ # just go ahead and return 0 if we already know it regardless # of boundaries if not self.n_measurements(): return 0 start, end, mask = self._check_boundaries(start, end, mask=mask) count = 0 for i_start, i_end, _ in mask.iterperiods(value=True): end_count = ( self._d.bisect_right(i_end) if include_end else self._d.bisect_left(i_end) ) start_count = ( self._d.bisect_left(i_start) if include_start else self._d.bisect_right(i_start) ) count += end_count - start_count if normalized: count /= self.n_measurements() return count
def _check_time_series(self, other): """Function used to check the type of the argument and raise an informative error message if it's not a TimeSeries. """ if not isinstance(other, TimeSeries): msg = f"unsupported operand types(s) for +: {type(self)} and {type(other)}" raise TypeError(msg)
[docs] @staticmethod def iter_merge_transitions(timeseries_list): """Yield (time, index, previous_value, new_value) for each transition across all timeseries. This is more memory-efficient than iter_merge for large numbers of timeseries because it yields individual transitions instead of copying the full state list at each step. Uses a flat-sort strategy: all transitions are extracted into a single list and sorted once, rather than using a priority queue. See docs/merge_strategies.rst for a detailed comparison of merge implementation approaches with benchmarks. Args: timeseries_list: An iterable of TimeSeries objects. Yields: Tuples of (time, index, previous_value, new_value) where: - time: the time of the transition - index: which timeseries changed - previous_value: the value before the transition - new_value: the value after the transition """ timeseries_list = list(timeseries_list) # Flat-sort strategy: extract all transitions and sort once. # Faster than a heap for in-memory data due to timsort's # cache-friendliness and lower per-element overhead. See # docs/merge_strategies.rst for alternatives and tradeoffs. triples = [] for index, ts in enumerate(timeseries_list): for t, v in ts: triples.append((t, index, v)) triples.sort() state = [ts.default for ts in timeseries_list] for t, index, next_value in triples: previous_value = state[index] state[index] = next_value yield t, index, previous_value, next_value
[docs] @classmethod def iter_merge(cls, timeseries_list): """Iterate through several time series in order, yielding (time, list) tuples where list is the values of each individual TimeSeries in the list at time t. Note: yields a full K-element list copy at each unique time. For large K, consider iter_merge_transitions which yields individual O(1) transitions instead. See docs/merge_strategies.rst for details. """ timeseries_list = list(timeseries_list) if not timeseries_list: return state = [ts.default for ts in timeseries_list] previous_t = object() first = True for t, index, _prev, value in cls.iter_merge_transitions( timeseries_list ): if not first and t != previous_t: yield previous_t, list(state) state[index] = value previous_t = t first = False if not first: yield previous_t, list(state)
[docs] @classmethod def merge(cls, ts_list, compact=True, operation=None): """Iterate through several time series in order, yielding (time, `value`) where `value` is the either the list of each individual TimeSeries in the list at time t (in the same order as in ts_list) or the result of the optional `operation` on that list of values. """ # If operation is not given then the default is the list # of defaults of all time series # If operation is given, then the default is the result of # the operation over the list of all defaults default = [ts.default for ts in ts_list] if operation: default = operation(default) result = cls(default=default) for t, merged in cls.iter_merge(ts_list): value = merged if operation is None else operation(merged) result.set(t, value, compact=compact) return result
@staticmethod def _flush_pending(pending, t, counts, result_data): """Record current counts for all values affected by pending transitions at time t.""" affected = {v for p, n in pending for v in (p, n)} for val in affected: result_data.setdefault(val, []).append((t, counts.get(val, 0)))
[docs] @classmethod def count_by_value(cls, ts_list): """Return a dict mapping each state value to a TimeSeries that counts how many of the input timeseries are in that state at each point in time. Efficient for many timeseries with few discrete states (e.g. boolean on/off, ticket open/closed). Uses iter_merge_transitions to process O(1) per transition, independent of the number of timeseries K. See docs/merge_strategies.rst for how this avoids the O(K) list copies that iter_merge/merge require. Args: ts_list: An iterable of TimeSeries objects. Returns: A dict where keys are the distinct values found across all input timeseries (including defaults), and values are TimeSeries objects whose value at any time t is the count of input timeseries equal to that state at time t. """ ts_list = list(ts_list) if not ts_list: return {} # Initialize counts from defaults counts = {} for ts in ts_list: counts[ts.default] = counts.get(ts.default, 0) + 1 result_data = {} # value -> list of (time, count) previous_t = object() pending = [] for t, _index, prev_val, new_val in cls.iter_merge_transitions(ts_list): if t != previous_t and pending: cls._flush_pending(pending, previous_t, counts, result_data) pending = [] counts[prev_val] = counts.get(prev_val, 0) - 1 counts[new_val] = counts.get(new_val, 0) + 1 pending.append((prev_val, new_val)) previous_t = t if pending: cls._flush_pending(pending, previous_t, counts, result_data) # Build result TimeSeries initial_counts = {} for ts in ts_list: initial_counts[ts.default] = initial_counts.get(ts.default, 0) + 1 result = {} all_values = set(initial_counts) | set(result_data) for val in all_values: ts = cls(default=initial_counts.get(val, 0)) if val in result_data: ts.set_many(result_data[val]) result[val] = ts return result
[docs] @classmethod def from_csv( cls, filename, time_column=0, value_column=1, time_transform=None, value_transform=None, skip_header=True, default=None, delimiter=",", ): """Load time series data from a CSV file. Args: filename (str): Path to the CSV file to read time_column (int): Index of the column containing time values (default: 0) value_column (int): Index of the column containing measurement values (default: 1) time_transform (callable, optional): Function to transform time strings to desired format. Default converts strings like "2020-01-01 12:00:00" to datetime objects. value_transform (callable, optional): Function to transform value strings. Default leaves values as strings. skip_header (bool): Whether to skip the first row of the file (default: True) default (any): Default value for the time series delimiter (str): CSV delimiter character (default: ",") Returns: TimeSeries: A new TimeSeries object with the data from the CSV Examples: >>> # Basic usage with default settings >>> ts = TimeSeries.from_csv("data.csv") >>> >>> # Custom time parsing >>> import datetime >>> ts = TimeSeries.from_csv( ... "data.csv", ... time_transform=lambda s: datetime.datetime.strptime(s, "%Y-%m-%dT%H:%M:%S") ... ) >>> >>> # Convert values to integers >>> ts = TimeSeries.from_csv( ... "data.csv", ... value_transform=int, ... default=0 ... ) """ # use default transformations if not specified if time_transform is None: time_transform = lambda s: datetime.datetime.strptime( s, "%Y-%m-%d %H:%M:%S" ) if value_transform is None: value_transform = lambda s: s result = cls(default=default) with open(filename) as infile: reader = csv.reader(infile, delimiter=delimiter) if skip_header: next(reader) result.set_many( ( time_transform(row[time_column]), value_transform(row[value_column]), ) for row in reader ) return result
[docs] @classmethod def from_json( cls, filename=None, json_string=None, time_key="time", value_key="value", time_transform=None, value_transform=None, default=None, ): """Load time series data from a JSON file or string. The JSON should be either: 1. A list of objects/dictionaries with time and value keys 2. A single object/dictionary with time keys and value values Args: filename (str, optional): Path to the JSON file json_string (str, optional): JSON string (used if filename not provided) time_key (str): The key for time values in each record (default: "time") value_key (str): The key for measurement values in each record (default: "value") time_transform (callable, optional): Function to transform time values to desired format Default converts ISO format strings to datetime objects. value_transform (callable, optional): Function to transform measurement values default (any): Default value for the time series Returns: TimeSeries: A new TimeSeries object with the data from the JSON Examples: >>> # From a list of records >>> ts = TimeSeries.from_json('data.json') >>> >>> # From a JSON string with custom keys >>> ts = TimeSeries.from_json( ... json_string='[{"timestamp": "2020-01-01T00:00:00", "temp": 20.5}]', ... time_key="timestamp", ... value_key="temp" ... ) >>> >>> # With custom time parsing >>> ts = TimeSeries.from_json( ... 'data.json', ... time_transform=lambda t: datetime.datetime.fromtimestamp(float(t)) ... ) """ import json # Set default transformations if not specified if time_transform is None: time_transform = lambda t: ( datetime.datetime.fromisoformat(t.replace("Z", "+00:00")) if isinstance(t, str) else t ) if value_transform is None: value_transform = lambda v: v result = cls(default=default) # Load JSON from either file or string if filename is not None: with open(filename) as infile: data = json.load(infile) elif json_string is not None: data = json.loads(json_string) else: msg = "Either filename or json_string must be provided" raise ValueError(msg) # Handle list format [{"time": t1, "value": v1}, ...] if isinstance(data, list): result.set_many( ( time_transform(record[time_key]), value_transform(record[value_key]), ) for record in data ) # Handle dictionary format {"t1": v1, "t2": v2, ...} elif isinstance(data, dict): result.set_many( (time_transform(time_str), value_transform(value)) for time_str, value in data.items() ) else: msg = "JSON data must be either a list or dictionary" raise TypeError(msg) return result
[docs] def to_json( self, filename=None, time_transform=None, value_transform=None, dict_format=False, ): """Export time series data to a JSON file or return as a JSON string. Args: filename (str, optional): Path where JSON file will be written. If None, returns a JSON string instead. time_transform (callable, optional): Function to transform time values before serializing. Default converts datetime objects to ISO format strings. value_transform (callable, optional): Function to transform values before serializing. dict_format (bool): If True, uses a dictionary format with times as keys. If False (default), uses a list of objects with time and value keys. Returns: str or None: If filename is None, returns the JSON string. Otherwise, writes to the file and returns None. Examples: >>> # Export to a file using default settings >>> ts.to_json('output.json') >>> >>> # Get JSON as a string and customize time formatting >>> json_str = ts.to_json( ... time_transform=lambda dt: dt.timestamp() ... ) >>> >>> # Use dictionary format instead of list format >>> ts.to_json('output.json', dict_format=True) """ import json # Set default transformations if not specified if time_transform is None: time_transform = lambda t: ( t.isoformat() if hasattr(t, "isoformat") else t ) if value_transform is None: value_transform = lambda v: v # Create the JSON data structure if dict_format: # Dictionary format: {"t1": v1, "t2": v2, ...} data = { time_transform(t): value_transform(v) for t, v in self.items() } else: # List format: [{"time": t1, "value": v1}, ...] data = [ {"time": time_transform(t), "value": value_transform(v)} for t, v in self.items() ] if filename is not None: with open(filename, "w") as outfile: json.dump(data, outfile, indent=2) return None else: return json.dumps(data, indent=2)
[docs] def operation(self, other, function, default=None): """Calculate "elementwise" operation either between this TimeSeries and another one, i.e. operation(t) = function(self(t), other(t)) or between this timeseries and a constant: operation(t) = function(self(t), other) If it's another time series, the measurement times in the resulting TimeSeries will be the union of the sets of measurement times of the input time series. If it's a constant, the measurement times will not change. """ # todo: consider the best way to deal with default, and make # consistent with other methods. check to_bool maybe result = TimeSeries(default=default) if isinstance(other, TimeSeries): for time, value in self: result[time] = function(value, other[time]) for time, value in other: result[time] = function(self[time], value) else: for time, value in self: result[time] = function(value, other) return result
[docs] def to_bool(self, invert=False, default=NotGiven): """Return the truth value of each element. Args: invert: opposite truth values default: If default is not explicitly given, keep it as None if it's None (which often means "undefined" rather than "false"), otherwise cast to bool Returns: :obj:`TimeSeries` with the results. """ if default is NotGiven: if self.default is None: new_default = None else: new_default = bool(self.default) if invert: new_default = not (new_default) else: # should this complain if default not in {None, True, False}? new_default = default if invert: def function(x, y): return not bool(x) else: def function(x, y): return bool(x) return self.operation(None, function, default=new_default)
[docs] def threshold(self, value, inclusive=False): """Return True if > than treshold value (or >= threshold value if inclusive=True). """ # todo: this seems like it's wrong... make a test to check (and fix if so!) # todo: deal with default if inclusive: def function(x, y): return x >= y else: def function(x, y): return x > y return self.operation(value, function)
[docs] def sum(self, other): """sum(x, y) = x(t) + y(t).""" # todo: better consistency and documentation about when Nones are ignored return TimeSeries.merge( [self, other], operation=operations.ignorant_sum )
[docs] def difference(self, other): """difference(x, y) = x(t) - y(t).""" return self.operation(other, lambda x, y: x - y)
[docs] def multiply(self, other): """mul(t) = self(t) * other(t).""" return self.operation(other, lambda x, y: x * y)
[docs] def logical_and(self, other): """logical_and(t) = self(t) and other(t).""" return self.operation(other, lambda x, y: x and y)
[docs] def logical_or(self, other): """logical_or(t) = self(t) or other(t).""" return self.operation(other, lambda x, y: x or y)
[docs] def logical_xor(self, other): """logical_xor(t) = self(t) ^ other(t).""" return self.operation(other, lambda x, y: bool(x) ^ bool(y))
def __setitem__(self, time, value): """Allow a[time] = value syntax or a a[start:end]=value.""" if isinstance(time, slice): return self.set_interval(time.start, time.stop, value) else: return self.set(time, value) def __getitem__(self, time): """Allow a[time] syntax.""" if isinstance(time, slice): msg = "Syntax a[start:end] not allowed" raise ValueError(msg) # noqa: TRY004 else: return self.get(time) def __delitem__(self, time): """Allow del[time] syntax.""" if isinstance(time, slice): return self.remove_points_from_interval(time.start, time.stop) else: return self.remove(time) def __add__(self, other): """Allow a + b syntax""" return self.sum(other) def __radd__(self, other): """Allow the operation 0 + TimeSeries() so that builtin sum function works on an iterable of TimeSeries. """ # skip type check if other is the integer 0 if other != 0: self._check_time_series(other) # 0 + self = self return self def __sub__(self, other): """Allow a - b syntax""" return self.difference(other) def __mul__(self, other): """Allow a * b syntax""" return self.multiply(other) def __and__(self, other): """Allow a & b syntax""" return self.logical_and(other) def __or__(self, other): """Allow a | b syntax""" return self.logical_or(other) def __xor__(self, other): """Allow a ^ b syntax""" return self.logical_xor(other) def __invert__(self): """Allow ~a syntax""" return self.to_bool(invert=True) def __eq__(self, other): return self.items() == other.items() def __ne__(self, other): return not (self == other) def _check_boundary(self, value, allow_infinite, lower_or_upper): if lower_or_upper == "lower": infinity_value = -infinity.inf method_name = "first_key" elif lower_or_upper == "upper": infinity_value = infinity.inf method_name = "last_key" else: msg = ( f'`lower_or_upper` must be "lower" or "upper", ' f"got {lower_or_upper}" ) raise ValueError(msg) if value is None: if allow_infinite: return infinity_value else: try: return getattr(self, method_name)() except IndexError as error: msg = ( f"can't use '{method_name}' for default " f"{lower_or_upper} boundary of empty TimeSeries" ) raise KeyError(msg) from error else: return value def _check_boundaries(self, start, end, mask=None, allow_infinite=False): if mask is not None and mask.is_empty(): msg = "mask can not be empty" raise ValueError(msg) # if only a mask is passed in, return mask boundaries and mask if start is None and end is None and mask is not None: return mask.first_key(), mask.last_key(), mask # replace with defaults if not given start = self._check_boundary(start, allow_infinite, "lower") end = self._check_boundary(end, allow_infinite, "upper") if start >= end: msg = f"start can't be >= end ({start} >= {end})" raise ValueError(msg) start_end_mask = TimeSeries(default=False) start_end_mask[start] = True start_end_mask[end] = False mask = start_end_mask if mask is None else mask & start_end_mask return start, end, mask def distribution_by_hour_of_day( self, first=0, last=23, start=None, end=None ): start, end, mask = self._check_boundaries(start, end) result = [] for hour in range(first, last + 1): mask = hour_of_day(start, end, hour) result.append((hour, self.distribution(mask=mask))) return result def distribution_by_day_of_week( self, first=0, last=6, start=None, end=None ): start, end, mask = self._check_boundaries(start, end) result = [] for week in range(first, last + 1): mask = day_of_week(start, end, week) result.append((week, self.distribution(mask=mask))) return result
[docs] def plot( self, interpolate="previous", figure_width=12, linewidth=1, marker="o", markersize=3, color="#222222", ): """Create a plot of the time series data. Creates a visualization of the time series using matplotlib. The plot shows data points at each measurement time and connects them with lines using the specified interpolation method. Args: interpolate (str): Interpolation method between points. Options are: - "previous": Step-like plot where each value stays constant until the next value (default) - "linear": Straight lines between data points figure_width (float): Width of the figure in inches (default: 12) linewidth (float): Width of the connecting lines (default: 1) marker (str): Marker style for data points, using matplotlib marker notation (default: "o" for circular markers) markersize (float): Size of the markers for data points (default: 3) color (str): Color of the line and markers (default: "#222222") Returns: tuple: A tuple containing (figure, axes) matplotlib objects that can be further customized or saved to a file. Raises: ImportError: If matplotlib is not installed ValueError: If an invalid interpolation method is specified Examples: >>> ts = TimeSeries() >>> ts[0] = 0 >>> ts[1] = 2 >>> ts[3] = 1 >>> >>> # Basic plot with default settings >>> fig, ax = ts.plot() >>> >>> # Custom plot with linear interpolation >>> fig, ax = ts.plot( ... interpolate="linear", ... figure_width=10, ... linewidth=2, ... marker="s", ... markersize=5, ... color="#FF5733" ... ) >>> >>> # Save the plot to a file >>> fig.savefig("my_timeseries.png") """ return plot.plot( self, interpolate=interpolate, figure_width=figure_width, linewidth=linewidth, marker=marker, markersize=markersize, color=color, )
def hour_of_day(start, end, hour): # start should be date, or if datetime, will use date of datetime floored = utils.datetime_floor(start) domain = TimeSeries(default=False) for day_start in utils.datetime_range( floored, end, "days", inclusive_end=True ): interval_start = day_start + datetime.timedelta(hours=hour) interval_end = interval_start + datetime.timedelta(hours=1) domain[interval_start] = True domain[interval_end] = False result = domain.slice(start, end) result[end] = False return result def day_of_week(start, end, weekday): # allow weekday name or number number = utils.weekday_number(weekday) # start should be date, or if datetime, will use date of datetime floored = utils.datetime_floor(start) next_week = floored + datetime.timedelta(days=7) for day in utils.datetime_range(floored, next_week, "days"): if day.weekday() == number: first_day = day break domain = TimeSeries(default=False) for week_start in utils.datetime_range( first_day, end, "weeks", inclusive_end=True ): interval_start = week_start interval_end = interval_start + datetime.timedelta(days=1) domain[interval_start] = True domain[interval_end] = False result = domain.slice(start, end) result[end] = False return result