"""A class for manipulating time series based on measurements at
unevenly-spaced times, see:
http://en.wikipedia.org/wiki/Unevenly_spaced_time_series
"""
# standard library
import csv
import datetime
import pprint
from itertools import tee
from queue import PriorityQueue
# 3rd party
from dateutil.parser import parse as date_parse
import sortedcontainers
from infinity import inf
from traces import operations
# local
from . import histogram, utils
# python 2/3 compatibility
try:
import itertools.izip as zip
except ImportError:
pass
[docs]class TimeSeries(object):
"""A class to help manipulate and analyze time series that are the
result of taking measurements at irregular points in time. For
example, here would be a simple time series that starts at 8am and
goes to 9:59am:
>>> ts = TimeSeries()
>>> ts['8:00am'] = 0
>>> ts['8:47am'] = 1
>>> ts['8:51am'] = 0
>>> ts['9:15am'] = 1
>>> ts['9:59am'] = 0
The value of the time series is the last recorded measurement: for
example, at 8:05am the value is 0 and at 8:48am the value is 1. So:
>>> ts['8:05am']
0
>>> ts['8:48am']
1
There are also a bunch of things for operating on another time
series: sums, difference, logical operators and such.
"""
def __init__(self, data=None, default=None):
self._d = sortedcontainers.SortedDict(data)
self.default = default
self.getter_functions = {
"previous": self._get_previous,
"linear": self._get_linear_interpolate,
}
def __getstate__(self):
return {
"data": self.items(),
"default": self.default,
}
def __setstate__(self, state):
self.__init__(**state)
def __iter__(self):
"""Iterate over sorted (time, value) pairs."""
return iter(self._d.items())
def __bool__(self):
return bool(self._d)
def is_empty(self):
return len(self) == 0
@property
def default(self):
"""Return the default value of the time series."""
return self._default
@default.setter
def default(self, value):
"""Set the default value of the time series."""
self._default = value
def _get_linear_interpolate(self, time):
right_index = self._d.bisect_right(time)
left_index = right_index - 1
if left_index < 0:
return self.default
elif right_index == len(self._d):
# right of last measurement
return self.last_item()[1]
else:
left_time, left_value = self._d.peekitem(left_index)
right_time, right_value = self._d.peekitem(right_index)
dt_interval = right_time - left_time
dt_start = time - left_time
if isinstance(dt_interval, datetime.timedelta):
dt_interval = dt_interval.total_seconds()
dt_start = dt_start.total_seconds()
slope = float(right_value - left_value) / dt_interval
value = slope * dt_start + left_value
return value
def _get_previous(self, time):
right_index = self._d.bisect_right(time)
left_index = right_index - 1
if right_index > 0:
_, left_value = self._d.peekitem(left_index)
return left_value
elif right_index == 0:
return self.default
else:
msg = (
"self._d.bisect_right({}) returned a negative value. "
"""This "can't" happen: please file an issue at """
"https://github.com/datascopeanalytics/traces/issues"
).format(time)
raise ValueError(msg)
[docs] def get(self, time, interpolate="previous"):
"""Get the value of the time series, even in-between measured values.
"""
try:
getter = self.getter_functions[interpolate]
except KeyError:
msg = (
"unknown value '{}' for interpolate, "
"valid values are in [{}]"
).format(interpolate, ", ".join(self.getter_functions))
raise ValueError(msg)
else:
return getter(time)
[docs] def get_item_by_index(self, index):
"""Get the (t, value) pair of the time series by index."""
return self._d.peekitem(index)
[docs] def last_item(self):
"""Returns the last (time, value) pair of the time series."""
return self.get_item_by_index(-1)
[docs] def last_key(self):
"""Returns the last time recorded in the time series"""
return self.last_item()[0]
[docs] def last_value(self):
"""Returns the last recorded value in the time series"""
return self.last_item()[1]
[docs] def first_item(self):
"""Returns the first (time, value) pair of the time series."""
return self.get_item_by_index(0)
[docs] def first_key(self):
"""Returns the first time recorded in the time series"""
return self.first_item()[0]
[docs] def first_value(self):
"""Returns the first recorded value in the time series"""
return self.first_item()[1]
[docs] def set(self, time, value, compact=False):
"""Set the value for the time series. If compact is True, only set the
value if it's different from what it would be anyway.
"""
if (
(len(self) == 0)
or (not compact)
or (compact and self.get(time) != value)
):
self._d[time] = value
[docs] def set_interval(self, start, end, value, compact=False):
"""Set the value for the time series on an interval. If compact is
True, only set the value if it's different from what it would
be anyway.
"""
# for each interval to render
for i, (s, e, v) in enumerate(self.iterperiods(start, end)):
# look at all intervals included in the current interval
# (always at least 1)
if i == 0:
# if the first, set initial value to new value of range
self.set(s, value, compact)
else:
# otherwise, remove intermediate key
del self[s]
# finish by setting the end of the interval to the previous value
self.set(end, v, compact)
[docs] def compact(self):
"""Convert this instance to a compact version: the value will be the
same at all times, but repeated measurements are discarded.
"""
previous_value = object()
redundant = []
for time, value in self:
if value == previous_value:
redundant.append(time)
previous_value = value
for time in redundant:
del self[time]
[docs] def items(self):
"""ts.items() -> list of the (key, value) pairs in ts, as 2-tuples"""
return self._d.items()
[docs] def exists(self):
"""returns False when the timeseries has a None value, True
otherwise
"""
result = TimeSeries(default=False if self.default is None else True)
for t, v in self:
result[t] = False if v is None else True
return result
[docs] def remove(self, time):
"""Allow removal of measurements from the time series. This throws an
error if the given time is not actually a measurement point.
"""
try:
del self._d[time]
except KeyError:
raise KeyError("no measurement at {}".format(time))
[docs] def remove_points_from_interval(self, start, end):
"""Allow removal of all points from the time series within a interval
[start:end].
"""
for s, e, v in self.iterperiods(start, end):
try:
del self._d[s]
except KeyError:
pass
[docs] def n_measurements(self):
"""Return the number of measurements in the time series."""
return len(self._d)
def __len__(self):
"""Number of points in the TimeSeries."""
return self.n_measurements()
def __repr__(self):
return "<TimeSeries>\n%s\n</TimeSeries>" % pprint.pformat(self._d)
[docs] def iterintervals(self, n=2):
"""Iterate over groups of `n` consecutive measurement points in the
time series.
"""
# tee the original iterator into n identical iterators
streams = tee(iter(self), n)
# advance the "cursor" on each iterator by an increasing
# offset, e.g. if n=3:
#
# [a, b, c, d, e, f, ..., w, x, y, z]
# first cursor --> *
# second cursor --> *
# third cursor --> *
for stream_index, stream in enumerate(streams):
for _ in range(stream_index):
next(stream)
# now, zip the offset streams back together to yield tuples,
# in the n=3 example it would yield:
# (a, b, c), (b, c, d), ..., (w, x, y), (x, y, z)
for intervals in zip(*streams):
yield intervals
@staticmethod
def _value_function(value):
# if value is None, don't filter
if value is None:
def value_function(t0_, t1_, value_):
return True
# if value is a function, use the function to filter
elif callable(value):
value_function = value
# if value is a constant other than None, then filter to
# return only the intervals where the value equals the
# constant
else:
def value_function(t0_, t1_, value_):
return value_ == value
return value_function
[docs] def iterperiods(self, start=None, end=None, value=None):
"""This iterates over the periods (optionally, within a given time
span) and yields (interval start, interval end, value) tuples.
TODO: add mask argument here.
"""
start, end, mask = self._check_boundaries(
start, end, allow_infinite=False
)
value_function = self._value_function(value)
# get start index and value
start_index = self._d.bisect_right(start)
if start_index:
_, start_value = self._d.peekitem(start_index - 1)
else:
start_value = self.default
# get last index before end of time span
end_index = self._d.bisect_right(end)
interval_t0, interval_value = start, start_value
for interval_t1 in self._d.islice(start_index, end_index):
if value_function(interval_t0, interval_t1, interval_value):
yield interval_t0, interval_t1, interval_value
# set start point to the end of this interval for next
# iteration
interval_t0 = interval_t1
interval_value = self[interval_t0]
# yield the time, duration, and value of the final period
if interval_t0 < end:
if value_function(interval_t0, end, interval_value):
yield interval_t0, end, interval_value
[docs] def slice(self, start, end):
"""Return an equivalent TimeSeries that only has points between
`start` and `end` (always starting at `start`)
"""
start, end, mask = self._check_boundaries(
start, end, allow_infinite=True
)
result = TimeSeries(default=self.default)
for t0, t1, value in self.iterperiods(start, end):
result[t0] = value
result[t1] = self[t1]
return result
def _check_regularization(self, start, end, sampling_period=None):
# only do these checks if sampling period is given
if sampling_period is not None:
# cast to both seconds and timedelta for error checking
if isinstance(sampling_period, datetime.timedelta):
sampling_period_seconds = sampling_period.total_seconds()
sampling_period_timedelta = sampling_period
else:
sampling_period_seconds = sampling_period
sampling_period_timedelta = datetime.timedelta(
seconds=sampling_period
)
if sampling_period_seconds <= 0:
msg = "sampling_period must be > 0"
raise ValueError(msg)
if sampling_period_seconds > utils.duration_to_number(end - start):
msg = (
"sampling_period "
"is greater than the duration between "
"start and end."
)
raise ValueError(msg)
if isinstance(start, datetime.datetime):
sampling_period = sampling_period_timedelta
else:
sampling_period = sampling_period_seconds
return sampling_period
[docs] def sample(
self, sampling_period, start=None, end=None, interpolate="previous"
):
"""Sampling at regular time periods.
"""
start, end, mask = self._check_boundaries(start, end)
sampling_period = self._check_regularization(
start, end, sampling_period
)
result = []
current_time = start
while current_time <= end:
value = self.get(current_time, interpolate=interpolate)
result.append((current_time, value))
current_time += sampling_period
return result
[docs] def moving_average(
self,
sampling_period,
window_size=None,
start=None,
end=None,
placement="center",
pandas=False,
):
"""Averaging over regular intervals
"""
start, end, mask = self._check_boundaries(start, end)
# default to sampling_period if not given
if window_size is None:
window_size = sampling_period
sampling_period = self._check_regularization(
start, end, sampling_period
)
# convert to datetime if the times are datetimes
full_window = float(window_size)
half_window = full_window / 2
if isinstance(start, datetime.datetime) and not isinstance(
full_window, datetime.timedelta
):
half_window = datetime.timedelta(seconds=half_window)
full_window = datetime.timedelta(seconds=full_window)
result = []
current_time = start
while current_time <= end:
if placement == "center":
window_start = current_time - half_window
window_end = current_time + half_window
elif placement == "left":
window_start = current_time
window_end = current_time + full_window
elif placement == "right":
window_start = current_time - full_window
window_end = current_time
else:
msg = 'unknown placement "{}"'.format(placement)
raise ValueError(msg)
# calculate mean over window and add (t, v) tuple to list
try:
mean = self.mean(window_start, window_end)
except TypeError as e:
if "NoneType" in str(e):
mean = None
else:
raise e
result.append((current_time, mean))
current_time += sampling_period
# convert to pandas Series if pandas=True
if pandas:
try:
import pandas as pd
except ImportError:
msg = "can't have pandas=True if pandas is not installed"
raise ImportError(msg)
result = pd.Series(
[v for t, v in result], index=[t for t, v in result],
)
return result
@staticmethod
def rebin(binned, key_function):
result = sortedcontainers.SortedDict()
for bin_start, value in binned.items():
new_bin_start = key_function(bin_start)
try:
result[new_bin_start] += value
except KeyError:
result[new_bin_start] = value
return result
def bin(
self,
unit,
n_units=1,
start=None,
end=None,
mask=None,
smaller=None,
transform="distribution",
):
# return an empty sorted dictionary if there is no time span
if mask is not None and mask.is_empty():
return sortedcontainers.SortedDict()
elif start is not None and start == end:
return sortedcontainers.SortedDict()
# use smaller if available
if smaller:
return self.rebin(
smaller, lambda x: utils.datetime_floor(x, unit, n_units),
)
start, end, mask = self._check_boundaries(start, end, mask=mask)
start = utils.datetime_floor(start, unit=unit, n_units=n_units)
function = getattr(self, transform)
result = sortedcontainers.SortedDict()
dt_range = utils.datetime_range(start, end, unit, n_units=n_units)
for bin_start, bin_end in utils.pairwise(dt_range):
result[bin_start] = function(
bin_start, bin_end, mask=mask, normalized=False
)
return result
[docs] def mean(self, start=None, end=None, mask=None, interpolate="previous"):
"""This calculated the average value of the time series over the given
time range from `start` to `end`, when `mask` is truthy.
"""
return self.distribution(
start=start, end=end, mask=mask, interpolate=interpolate
).mean()
[docs] def distribution(
self,
start=None,
end=None,
normalized=True,
mask=None,
interpolate="previous",
):
"""Calculate the distribution of values over the given time range from
`start` to `end`.
Args:
start (orderable, optional): The lower time bound of
when to calculate the distribution. By default, the
first time point will be used.
end (orderable, optional): The upper time bound of
when to calculate the distribution. By default, the
last time point will be used.
normalized (bool): If True, distribution will sum to
one. If False and the time values of the TimeSeries
are datetimes, the units will be seconds.
mask (:obj:`TimeSeries`, optional): A domain on which to
calculate the distribution.
interpolate (str, optional): Method for interpolating
between measurement points: either "previous"
(default) or "linear". Note: if "previous" is used,
then the resulting histogram is exact. If "linear" is
given, then the values used for the histogram are the
average value for each segment -- the mean of this
histogram will be exact, but higher moments (variance)
will be approximate.
Returns:
:obj:`Histogram` with the results.
"""
start, end, mask = self._check_boundaries(start, end, mask=mask)
counter = histogram.Histogram()
for i_start, i_end, _ in mask.iterperiods(value=True):
for t0, t1, _ in self.iterperiods(i_start, i_end):
duration = utils.duration_to_number(t1 - t0, units="seconds",)
midpoint = utils.time_midpoint(t0, t1)
value = self.get(midpoint, interpolate=interpolate)
try:
counter[value] += duration
except histogram.UnorderableElements:
counter = histogram.Histogram.from_dict(
dict(counter), key=hash
)
counter[value] += duration
# divide by total duration if result needs to be normalized
if normalized:
return counter.normalized()
else:
return counter
[docs] def n_points(
self,
start=-inf,
end=+inf,
mask=None,
include_start=True,
include_end=False,
normalized=False,
):
"""Calculate the number of points over the given time range from
`start` to `end`.
Args:
start (orderable, optional): The lower time bound of when
to calculate the distribution. By default, start is
-infinity.
end (orderable, optional): The upper time bound of when to
calculate the distribution. By default, the end is
+infinity.
mask (:obj:`TimeSeries`, optional): A
domain on which to calculate the distribution.
Returns:
`int` with the result
"""
# just go ahead and return 0 if we already know it regardless
# of boundaries
if not self.n_measurements():
return 0
start, end, mask = self._check_boundaries(start, end, mask=mask)
count = 0
for i_start, i_end, _ in mask.iterperiods(value=True):
if include_end:
end_count = self._d.bisect_right(i_end)
else:
end_count = self._d.bisect_left(i_end)
if include_start:
start_count = self._d.bisect_left(i_start)
else:
start_count = self._d.bisect_right(i_start)
count += end_count - start_count
if normalized:
count /= float(self.n_measurements())
return count
def _check_time_series(self, other):
"""Function used to check the type of the argument and raise an
informative error message if it's not a TimeSeries.
"""
if not isinstance(other, TimeSeries):
msg = "unsupported operand types(s) for +: %s and %s" % (
type(self),
type(other),
)
raise TypeError(msg)
@staticmethod
def _iter_merge(timeseries_list):
"""This function uses a priority queue to efficiently yield the (time,
value_list) tuples that occur from merging together many time
series.
"""
# cast to list since this is getting iterated over several
# times (causes problem if timeseries_list is a generator)
timeseries_list = list(timeseries_list)
# Create iterators for each timeseries and then add the first
# item from each iterator onto a priority queue. The first
# item to be popped will be the one with the lowest time
queue = PriorityQueue()
for index, timeseries in enumerate(timeseries_list):
iterator = iter(timeseries)
try:
t, value = next(iterator)
except StopIteration:
pass
else:
queue.put((t, index, value, iterator))
# `state` keeps track of the value of the merged
# TimeSeries. It starts with the default. It starts as a list
# of the default value for each individual TimeSeries.
state = [ts.default for ts in timeseries_list]
while not queue.empty():
# get the next time with a measurement from queue
t, index, next_value, iterator = queue.get()
# make a copy of previous state, and modify only the value
# at the index of the TimeSeries that this item came from
state = list(state)
state[index] = next_value
yield t, state
# add the next measurement from the time series to the
# queue (if there is one)
try:
t, value = next(iterator)
except StopIteration:
pass
else:
queue.put((t, index, value, iterator))
[docs] @classmethod
def iter_merge(cls, timeseries_list):
"""Iterate through several time series in order, yielding (time, list)
tuples where list is the values of each individual TimeSeries
in the list at time t.
"""
# using return without an argument is the way to say "the
# iterator is empty" when there is nothing to iterate over
# (the more you know...)
if not timeseries_list:
return
# for ts in timeseries_list:
# if ts.is_floating():
# msg = "can't merge empty TimeSeries with no default value"
# raise KeyError(msg)
# This function mostly wraps _iter_merge, the main point of
# this is to deal with the case of tied times, where we only
# want to yield the last list of values that occurs for any
# group of tied times.
index, previous_t, previous_state = -1, object(), object()
for index, (t, state) in enumerate(cls._iter_merge(timeseries_list)):
if index > 0 and t != previous_t:
yield previous_t, previous_state
previous_t, previous_state = t, state
# only yield final thing if there was at least one element
# yielded by _iter_merge
if index > -1:
yield previous_t, previous_state
[docs] @classmethod
def merge(cls, ts_list, compact=True, operation=None):
"""Iterate through several time series in order, yielding (time,
`value`) where `value` is the either the list of each
individual TimeSeries in the list at time t (in the same order
as in ts_list) or the result of the optional `operation` on
that list of values.
"""
# If operation is not given then the default is the list
# of defaults of all time series
# If operation is given, then the default is the result of
# the operation over the list of all defaults
default = [ts.default for ts in ts_list]
if operation:
default = operation(default)
result = cls(default=default)
for t, merged in cls.iter_merge(ts_list):
if operation is None:
value = merged
else:
value = operation(merged)
result.set(t, value, compact=compact)
return result
@staticmethod
def csv_time_transform(raw):
return date_parse(raw)
@staticmethod
def csv_value_transform(raw):
return str(raw)
@classmethod
def from_csv(
cls,
filename,
time_column=0,
value_column=1,
time_transform=None,
value_transform=None,
skip_header=True,
default=None,
):
# use default on class if not given
if time_transform is None:
time_transform = cls.csv_time_transform
if value_transform is None:
value_transform = cls.csv_value_transform
result = cls(default=default)
with open(filename) as infile:
reader = csv.reader(infile)
if skip_header:
next(reader)
for row in reader:
time = time_transform(row[time_column])
value = value_transform(row[value_column])
result[time] = value
return result
[docs] def operation(self, other, function, **kwargs):
"""Calculate "elementwise" operation either between this TimeSeries
and another one, i.e.
operation(t) = function(self(t), other(t))
or between this timeseries and a constant:
operation(t) = function(self(t), other)
If it's another time series, the measurement times in the
resulting TimeSeries will be the union of the sets of
measurement times of the input time series. If it's a
constant, the measurement times will not change.
"""
result = TimeSeries(**kwargs)
if isinstance(other, TimeSeries):
for time, value in self:
result[time] = function(value, other[time])
for time, value in other:
result[time] = function(self[time], value)
else:
for time, value in self:
result[time] = function(value, other)
return result
[docs] def to_bool(self, invert=False):
"""Return the truth value of each element."""
if invert:
def function(x, y):
return not bool(x)
else:
def function(x, y):
return bool(x)
return self.operation(None, function)
[docs] def threshold(self, value, inclusive=False):
"""Return True if > than treshold value (or >= threshold value if
inclusive=True).
"""
if inclusive:
def function(x, y):
return x >= y
else:
def function(x, y):
return x > y
return self.operation(value, function)
[docs] def sum(self, other):
"""sum(x, y) = x(t) + y(t)."""
return TimeSeries.merge(
[self, other], operation=operations.ignorant_sum
)
[docs] def difference(self, other):
"""difference(x, y) = x(t) - y(t)."""
return self.operation(other, lambda x, y: x - y)
[docs] def multiply(self, other):
"""mul(t) = self(t) * other(t)."""
return self.operation(other, lambda x, y: x * y)
[docs] def logical_and(self, other):
"""logical_and(t) = self(t) and other(t)."""
return self.operation(other, lambda x, y: int(x and y))
[docs] def logical_or(self, other):
"""logical_or(t) = self(t) or other(t)."""
return self.operation(other, lambda x, y: int(x or y))
[docs] def logical_xor(self, other):
"""logical_xor(t) = self(t) ^ other(t)."""
return self.operation(other, lambda x, y: int(bool(x) ^ bool(y)))
def __setitem__(self, time, value):
"""Allow a[time] = value syntax or a a[start:end]=value."""
if isinstance(time, slice):
return self.set_interval(time.start, time.stop, value)
else:
return self.set(time, value)
def __getitem__(self, time):
"""Allow a[time] syntax."""
if isinstance(time, slice):
raise ValueError("Syntax a[start:end] not allowed")
else:
return self.get(time)
def __delitem__(self, time):
"""Allow del[time] syntax."""
if isinstance(time, slice):
return self.remove_points_from_interval(time.start, time.stop)
else:
return self.remove(time)
def __add__(self, other):
"""Allow a + b syntax"""
return self.sum(other)
def __radd__(self, other):
"""Allow the operation 0 + TimeSeries() so that builtin sum function
works on an iterable of TimeSeries.
"""
# skip type check if other is the integer 0
if not other == 0:
self._check_time_series(other)
# 0 + self = self
return self
def __sub__(self, other):
"""Allow a - b syntax"""
return self.difference(other)
def __mul__(self, other):
"""Allow a * b syntax"""
return self.multiply(other)
def __and__(self, other):
"""Allow a & b syntax"""
return self.logical_and(other)
def __or__(self, other):
"""Allow a | b syntax"""
return self.logical_or(other)
def __xor__(self, other):
"""Allow a ^ b syntax"""
return self.logical_xor(other)
def __eq__(self, other):
return self.items() == other.items()
def __ne__(self, other):
return not (self == other)
def _check_boundary(self, value, allow_infinite, lower_or_upper):
if lower_or_upper == "lower":
infinity_value = -inf
method_name = "first_key"
elif lower_or_upper == "upper":
infinity_value = inf
method_name = "last_key"
else:
msg = '`lower_or_upper` must be "lower" or "upper", got {}'.format(
lower_or_upper,
)
raise ValueError(msg)
if value is None:
if allow_infinite:
return infinity_value
else:
try:
return getattr(self, method_name)()
except IndexError:
msg = (
"can't use '{}' for default {} boundary "
"of empty TimeSeries"
).format(method_name, lower_or_upper)
raise KeyError(msg)
else:
return value
def _check_boundaries(self, start, end, mask=None, allow_infinite=False):
if mask is not None and mask.is_empty():
raise ValueError("mask can not be empty")
# if only a mask is passed in, return mask boundaries and mask
if start is None and end is None and mask is not None:
return mask.first_key(), mask.last_key(), mask
# replace with defaults if not given
start = self._check_boundary(start, allow_infinite, "lower")
end = self._check_boundary(end, allow_infinite, "upper")
if start >= end:
msg = "start can't be >= end ({} >= {})".format(start, end)
raise ValueError(msg)
start_end_mask = TimeSeries(default=False)
start_end_mask[start] = True
start_end_mask[end] = False
if mask is None:
mask = start_end_mask
else:
mask = mask & start_end_mask
return start, end, mask
def distribution_by_hour_of_day(
self, first=0, last=23, start=None, end=None
):
start, end, mask = self._check_boundaries(start, end)
result = []
for hour in range(first, last + 1):
mask = hour_of_day(start, end, hour)
result.append((hour, self.distribution(mask=mask)))
return result
def distribution_by_day_of_week(
self, first=0, last=6, start=None, end=None
):
start, end, mask = self._check_boundaries(start, end)
result = []
for week in range(first, last + 1):
mask = day_of_week(start, end, week)
result.append((week, self.distribution(mask=mask)))
return result
def hour_of_day(start, end, hour):
# start should be date, or if datetime, will use date of datetime
floored = utils.datetime_floor(start)
domain = TimeSeries(default=False)
for day_start in utils.datetime_range(
floored, end, "days", inclusive_end=True
):
interval_start = day_start + datetime.timedelta(hours=hour)
interval_end = interval_start + datetime.timedelta(hours=1)
domain[interval_start] = True
domain[interval_end] = False
result = domain.slice(start, end)
result[end] = False
return result
def day_of_week(start, end, weekday):
# allow weekday name or number
number = utils.weekday_number(weekday)
# start should be date, or if datetime, will use date of datetime
floored = utils.datetime_floor(start)
next_week = floored + datetime.timedelta(days=7)
for day in utils.datetime_range(floored, next_week, "days"):
if day.weekday() == number:
first_day = day
break
domain = TimeSeries(default=False)
for week_start in utils.datetime_range(
first_day, end, "weeks", inclusive_end=True
):
interval_start = week_start
interval_end = interval_start + datetime.timedelta(days=1)
domain[interval_start] = True
domain[interval_end] = False
result = domain.slice(start, end)
result[end] = False
return result