Source code for traces.histogram

import math
import sortedcontainers
from six import itervalues, iteritems


class UnorderableElements(TypeError):
    pass


class UnhashableType(TypeError):
    pass


[docs]class Histogram(sortedcontainers.SortedDict): @classmethod def from_dict(cls, in_dict, *args, **kwargs): self = cls(*args, **kwargs) for key, value in iteritems(in_dict): self[key] = value return self def __init__(self, data=(), **kwargs): if 'key' in kwargs: super(Histogram, self).__init__(kwargs['key']) else: super(Histogram, self).__init__() for datum in data: self[datum] += 1 def __getitem__(self, key): try: result = super(Histogram, self).__getitem__(key) except KeyError: result = 0 except TypeError as e: if "unorderable" in str(e): raise UnorderableElements(e) if "unhashable" in str(e): msg = "Can't make histogram of unhashable type (%s)" % type( key) raise UnhashableType(msg) raise e return result def __setitem__(self, key, value): try: result = super(Histogram, self).__setitem__(key, value) except TypeError as e: if "unorderable" in str(e): raise UnorderableElements(e) if "not supported between instances of" in str(e): raise UnorderableElements(e) if "unhashable" in str(e): msg = "Can't make histogram of unhashable type (%s)" % type( key) raise UnhashableType(msg) raise e return result
[docs] def total(self): """Sum of values.""" return sum(itervalues(self))
[docs] def mean(self): """Mean of the distribution.""" _self = self._discard_value(None) if not _self.total(): return None weighted_sum = sum( key * value for key, value in iteritems(_self) ) return weighted_sum / float(_self.total())
[docs] def variance(self): """Variance of the distribution.""" _self = self._discard_value(None) if not _self.total(): return 0.0 mean = _self.mean() weighted_central_moment = sum( count * (value - mean)**2 for value, count in iteritems(_self) ) return weighted_central_moment / float(_self.total())
[docs] def standard_deviation(self): """Standard deviation of the distribution.""" return math.sqrt(self.variance())
[docs] def normalized(self): """Return a normalized version of the histogram where the values sum to one. """ total = self.total() result = Histogram() for value, count in iteritems(self): try: result[value] = count / float(total) except UnorderableElements as e: result = Histogram.from_dict(dict(result), key=hash) result[value] = count / float(total) return result
def _discard_value(self, value): if value not in self: return self else: return self.__class__.from_dict( {k: v for k, v in iteritems(self) if k is not value} )
[docs] def max(self): """Maximum observed value.""" return self.iloc[-1]
[docs] def min(self): """Minimum observed value.""" return self.iloc[0]
def _quantile_function(self, alpha=0.5, smallest_count=None): """Return a function that returns the quantile values for this histogram. """ total = float(self.total()) smallest_observed_count = min(itervalues(self)) if smallest_count is None: smallest_count = smallest_observed_count else: smallest_count = min(smallest_count, smallest_observed_count) beta = alpha * smallest_count debug_plot = [] cumulative_sum = 0.0 inverse = sortedcontainers.SortedDict() for value, count in iteritems(self): debug_plot.append((cumulative_sum / total, value)) inverse[(cumulative_sum + beta) / total] = value cumulative_sum += count inverse[(cumulative_sum - beta) / total] = value debug_plot.append((cumulative_sum / total, value)) # get maximum and minumum q values q_min = inverse.iloc[0] q_max = inverse.iloc[-1] # this stuff if helpful for debugging -- keep it in here # for i, j in debug_plot: # print i, j # print '' # for i, j in inverse.iteritems(): # print i, j # print '' def function(q): if q < 0.0 or q > 1.0: msg = 'invalid quantile %s, need `0 <= q <= 1`' % q raise ValueError(msg) elif q < q_min: q = q_min elif q > q_max: q = q_max # if beta is if beta > 0: if q in inverse: result = inverse[q] else: previous_index = inverse.bisect_left(q) - 1 x1 = inverse.iloc[previous_index] x2 = inverse.iloc[previous_index + 1] y1 = inverse[x1] y2 = inverse[x2] result = (y2 - y1) * (q - x1) / float(x2 - x1) + y1 else: if q in inverse: previous_index = inverse.bisect_left(q) - 1 x1 = inverse.iloc[previous_index] x2 = inverse.iloc[previous_index + 1] y1 = inverse[x1] y2 = inverse[x2] result = 0.5 * (y1 + y2) else: previous_index = inverse.bisect_left(q) - 1 x1 = inverse.iloc[previous_index] result = inverse[x1] return float(result) return function def median(self, alpha=0.5, smallest_count=None): return self.quantile(0.5, alpha=alpha, smallest_count=smallest_count) def quantiles(self, q_list, alpha=0.5, smallest_count=None): f = self._quantile_function(alpha=alpha, smallest_count=smallest_count) return [f(q) for q in q_list] def quantile(self, q, alpha=0.5, smallest_count=None): return \ self.quantiles([q], alpha=alpha, smallest_count=smallest_count)[0] def add(self, other): result = Histogram() for key, value in iteritems(self): result[key] += value for key, value in iteritems(other): result[key] += value return result __add__ = add