import math
import sortedcontainers
class UnorderableElements(TypeError):
pass
class UnhashableType(TypeError):
pass
[docs]class Histogram(sortedcontainers.SortedDict):
@classmethod
def from_dict(cls, in_dict, *args, **kwargs):
self = cls(*args, **kwargs)
for key, value in in_dict.items():
self[key] = value
return self
def __init__(self, data=(), **kwargs):
if 'key' in kwargs:
super(Histogram, self).__init__(kwargs['key'])
else:
super(Histogram, self).__init__()
for datum in data:
self[datum] += 1
def __getitem__(self, key):
try:
result = super(Histogram, self).__getitem__(key)
except KeyError:
result = 0
except TypeError as error:
if "unorderable" in str(error):
raise UnorderableElements(error)
if "unhashable" in str(error):
msg = "Can't make histogram of unhashable type (%s)" % type(
key)
raise UnhashableType(msg)
raise error
return result
def __setitem__(self, key, value):
try:
result = super(Histogram, self).__setitem__(key, value)
except TypeError as error:
if "unorderable" in str(error):
raise UnorderableElements(error)
if "not supported between instances of" in str(error):
raise UnorderableElements(error)
if "unhashable" in str(error):
msg = "Can't make histogram of unhashable type (%s)" % type(
key)
raise UnhashableType(msg)
raise error
return result
[docs] def total(self):
"""Sum of values."""
return sum(self.values())
def _prepare_for_stats(self):
"""Removes None values and calculates total."""
clean = self._discard_value(None)
total = float(clean.total())
return clean, total
[docs] def mean(self):
"""Mean of the distribution."""
clean, total = self._prepare_for_stats()
if not total:
return None
weighted_sum = sum(key * value for key, value in clean.items())
return weighted_sum / total
[docs] def variance(self):
"""Variance of the distribution."""
clean, total = self._prepare_for_stats()
if not total:
return None
mean = self.mean()
weighted_central_moment = sum(
count * (value - mean)**2 for value, count in clean.items()
)
return weighted_central_moment / total
[docs] def standard_deviation(self):
"""Standard deviation of the distribution."""
clean, total = self._prepare_for_stats()
if not total:
return None
return math.sqrt(clean.variance())
[docs] def normalized(self):
"""Return a normalized version of the histogram where the values sum
to one.
"""
total = self.total()
result = Histogram()
for value, count in self.items():
try:
result[value] = count / float(total)
except UnorderableElements:
result = Histogram.from_dict(dict(result), key=hash)
result[value] = count / float(total)
return result
def _discard_value(self, value):
if value not in self:
return self
else:
return self.__class__.from_dict(
{k: v for k, v in self.items() if k is not value}
)
[docs] def max(self, include_zero=False):
"""Maximum observed value with non-zero count."""
for key, value in reversed(self.items()):
if value > 0 or include_zero:
return key
[docs] def min(self, include_zero=False):
"""Minimum observed value with non-zero count."""
for key, value in self.items():
if value > 0 or include_zero:
return key
def _quantile_function(self, alpha=0.5, smallest_count=None):
"""Return a function that returns the quantile values for this
histogram.
"""
clean, total = self._prepare_for_stats()
if not total:
return lambda q: None
smallest_observed_count = min(clean.values())
if smallest_count is None:
smallest_count = smallest_observed_count
else:
smallest_count = min(smallest_count, smallest_observed_count)
beta = alpha * smallest_count
debug_plot = []
cumulative_sum = 0.0
inverse = sortedcontainers.SortedDict()
for value, count in clean.items():
debug_plot.append((cumulative_sum / total, value))
inverse[(cumulative_sum + beta) / total] = value
cumulative_sum += count
inverse[(cumulative_sum - beta) / total] = value
debug_plot.append((cumulative_sum / total, value))
# get maximum and minumum q values
q_min = inverse.keys()[0]
q_max = inverse.keys()[-1]
# this stuff if helpful for debugging -- keep it in here
# for i, j in debug_plot:
# print i, j
# print ''
# for i, j in inverse.items():
# print i, j
# print ''
def function(q):
if q < 0.0 or q > 1.0:
msg = 'invalid quantile {}, need `0 <= q <= 1`'.format(q)
raise ValueError(msg)
elif q < q_min:
q = q_min
elif q > q_max:
q = q_max
# if beta is
if beta > 0:
if q in inverse:
result = inverse[q]
else:
previous_index = inverse.bisect_left(q) - 1
x1 = inverse.keys()[previous_index]
x2 = inverse.keys()[previous_index + 1]
y1 = inverse[x1]
y2 = inverse[x2]
result = (y2 - y1) * (q - x1) / float(x2 - x1) + y1
else:
if q in inverse:
previous_index = inverse.bisect_left(q) - 1
x1 = inverse.keys()[previous_index]
x2 = inverse.keys()[previous_index + 1]
y1 = inverse[x1]
y2 = inverse[x2]
result = 0.5 * (y1 + y2)
else:
previous_index = inverse.bisect_left(q) - 1
x1 = inverse.keys()[previous_index]
result = inverse[x1]
return float(result)
return function
def median(self, alpha=0.5, smallest_count=None):
return self.quantile(0.5, alpha=alpha, smallest_count=smallest_count)
def quantiles(self, q_list, alpha=0.5, smallest_count=None):
f = self._quantile_function(alpha=alpha, smallest_count=smallest_count)
return [f(q) for q in q_list]
def quantile(self, q, alpha=0.5, smallest_count=None):
return \
self.quantiles([q], alpha=alpha, smallest_count=smallest_count)[0]
def add(self, other):
result = Histogram()
for key, value in self.items():
result[key] += value
for key, value in other.items():
result[key] += value
return result
__add__ = add