[Common] Add more streaming algorithms

Changes:

 * Refactor classes names for existing streaming algorithms
   (rally.common.streaming_algorithms), as they are too long

 * Add new streaming algorithms classes to module
   rally.common.streaming_algorithms

   All of these classes are intended for further usage in
   reports generation - to make reports generation simplier,
   unified and able to process arbitrary data with low
   memory usage.

   These classes are:

   MinComputation
     simply keep minimal value from stream of numbers

   MaxComputation
     simply keep maximal value from stream of numbers

   PercentileComputation
     calculate percentile value from stream of numbers,
     including median value (with specified percent=50).

     This class will replace the following functions:
     rally.task.processing.utils.median
     rally.task.processing.utils.percentile

   ProgressComputation
     calculate percent how many times add() has been called
     with respect to expected (total) calls number

   IncrementComputation
     simple counter how many times add() has been called

Change-Id: Ib63d34f573a695edf86b309d0b7bb69571b2d95d
This commit is contained in:
Alexander Maretskiy 2015-07-14 17:37:20 +03:00
parent 5a6139f70e
commit 7652ac0e4b
4 changed files with 254 additions and 17 deletions

View File

@ -14,6 +14,7 @@
# under the License.
import abc
import heapq
import math
import six
@ -34,9 +35,15 @@ class StreamingAlgorithm(object):
def result(self):
"""Return the result based on the values processed so far."""
def _cast_to_float(self, value):
try:
return float(value)
except (TypeError, ValueError):
raise TypeError("Non-numerical value: %r" % value)
class MeanStreamingComputation(StreamingAlgorithm):
"""Computes mean for a stream of numbers."""
class MeanComputation(StreamingAlgorithm):
"""Compute mean for a stream of numbers."""
def __init__(self):
self.total = 0.0
@ -54,14 +61,14 @@ class MeanStreamingComputation(StreamingAlgorithm):
return self.total / self.count
class StdDevStreamingComputation(StreamingAlgorithm):
"""Computes the standard deviation for a stream of numbers."""
class StdDevComputation(StreamingAlgorithm):
"""Compute standard deviation for a stream of numbers."""
def __init__(self):
self.count = 0
# NOTE(msdubov): To compute std, we need the auxiliary variables below.
self.dev_sum = 0.0
self.mean_computation = MeanStreamingComputation()
self.mean_computation = MeanComputation()
self.mean = 0.0
def add(self, value):
@ -80,3 +87,117 @@ class StdDevStreamingComputation(StreamingAlgorithm):
"need at least two values to be processed.")
raise exceptions.RallyException(message)
return math.sqrt(self.dev_sum / (self.count - 1))
class MinComputation(StreamingAlgorithm):
"""Compute minimal value from a stream of numbers."""
def __init__(self):
self._value = None
def add(self, value):
value = self._cast_to_float(value)
if self._value is None or value < self._value:
self._value = value
def result(self):
if self._value is None:
raise ValueError("No values have been processed")
return self._value
class MaxComputation(StreamingAlgorithm):
"""Compute maximal value from a stream of numbers."""
def __init__(self):
self._value = None
def add(self, value):
value = self._cast_to_float(value)
if self._value is None or value > self._value:
self._value = value
def result(self):
if self._value is None:
raise ValueError("No values have been processed")
return self._value
class PercentileComputation(StreamingAlgorithm):
"""Compute percentile value from a stream of numbers."""
def __init__(self, percent):
"""Init streaming computation.
:param percent: numeric percent (from 0.1 to 99.9)
"""
if not 0 < percent < 100:
raise ValueError("Unexpected percent: %s" % percent)
self._percent = percent
self._count = 0
self._left = []
self._right = []
self._current_percentile = None
def add(self, value):
value = self._cast_to_float(value)
if self._current_percentile and value > self._current_percentile:
heapq.heappush(self._right, value)
else:
heapq.heappush(self._left, -value)
self._count += 1
expected_left = int(self._percent * (self._count + 1) / 100)
if len(self._left) > expected_left:
heapq.heappush(self._right, -heapq.heappop(self._left))
elif len(self._left) < expected_left:
heapq.heappush(self._left, -heapq.heappop(self._right))
left = -self._left[0] if len(self._left) else 0
right = self._right[0] if len(self._right) else 0
self._current_percentile = left + (right - left) / 2.
def result(self):
if self._current_percentile is None:
raise ValueError("No values have been processed")
return self._current_percentile
class ProgressComputation(StreamingAlgorithm):
"""Compute progress in percent."""
def __init__(self, base_count):
"""Init streaming computation.
:param base_count: int number for end progress (100% reached)
"""
self._base_count = int(base_count) or 1
self._count = 0
def add(self, *args):
if self._count >= self._base_count:
raise RuntimeError(
"100%% progress is already reached (count of %d)"
% self._base_count)
self._count += 1
def result(self):
return self._count / float(self._base_count) * 100
class IncrementComputation(StreamingAlgorithm):
"""Simple incremental counter."""
def __init__(self):
self._count = 0
def add(self, *args):
self._count += 1
def result(self):
return self._count

View File

@ -33,7 +33,7 @@ class MaxAverageDuration(sla.SLA):
def __init__(self, criterion_value):
super(MaxAverageDuration, self).__init__(criterion_value)
self.avg = 0.0
self.avg_comp = streaming_algorithms.MeanStreamingComputation()
self.avg_comp = streaming_algorithms.MeanComputation()
def add_iteration(self, iteration):
if not iteration.get("error"):

View File

@ -52,8 +52,8 @@ class Outliers(sla.SLA):
self.iterations = 0
self.outliers = 0
self.threshold = None
self.mean_comp = streaming_algorithms.MeanStreamingComputation()
self.std_comp = streaming_algorithms.StdDevStreamingComputation()
self.mean_comp = streaming_algorithms.MeanComputation()
self.std_comp = streaming_algorithms.StdDevComputation()
def add_iteration(self, iteration):
if not iteration.get("error"):

View File

@ -13,6 +13,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import ddt
import math
from rally.common import streaming_algorithms as algo
@ -20,49 +22,163 @@ from rally import exceptions
from tests.unit import test
class MeanStreamingComputationTestCase(test.TestCase):
class MeanComputationTestCase(test.TestCase):
def test_empty_stream(self):
mean_computation = algo.MeanStreamingComputation()
mean_computation = algo.MeanComputation()
self.assertRaises(exceptions.RallyException, mean_computation.result)
def test_one_value(self):
mean_computation = algo.MeanStreamingComputation()
mean_computation = algo.MeanComputation()
mean_computation.add(10.0)
self.assertEqual(10.0, mean_computation.result())
def test_stream(self):
stream = range(10)
mean_computation = algo.MeanStreamingComputation()
mean_computation = algo.MeanComputation()
for value in stream:
mean_computation.add(value)
excepted_mean = float(sum(stream)) / len(stream)
self.assertEqual(excepted_mean, mean_computation.result())
class StdDevStreamingComputationTestCase(test.TestCase):
class StdDevComputationTestCase(test.TestCase):
def test_empty_stream(self):
std_computation = algo.StdDevStreamingComputation()
std_computation = algo.StdDevComputation()
self.assertRaises(exceptions.RallyException, std_computation.result)
def test_one_value(self):
std_computation = algo.StdDevStreamingComputation()
std_computation = algo.StdDevComputation()
std_computation.add(10.0)
self.assertRaises(exceptions.RallyException, std_computation.result)
def test_two_values(self):
std_computation = algo.StdDevStreamingComputation()
std_computation = algo.StdDevComputation()
std_computation.add(10.0)
std_computation.add(10.0)
self.assertEqual(0.0, std_computation.result())
def test_stream(self):
stream = range(10)
std_computation = algo.StdDevStreamingComputation()
std_computation = algo.StdDevComputation()
for value in stream:
std_computation.add(value)
mean = float(sum(stream)) / len(stream)
excepted_std = math.sqrt(sum((x - mean) ** 2 for x in stream) /
(len(stream) - 1))
self.assertEqual(excepted_std, std_computation.result())
class MinComputationTestCase(test.TestCase):
def test_add_and_result(self):
comp = algo.MinComputation()
[comp.add(i) for i in [3, 5.2, 2, -1, 1, 8, 33.4, 0, -3, 42, -2]]
self.assertEqual(-3, comp.result())
def test_add_raises(self):
comp = algo.MinComputation()
self.assertRaises(TypeError, comp.add)
self.assertRaises(TypeError, comp.add, None)
self.assertRaises(TypeError, comp.add, "str")
def test_result_raises(self):
comp = algo.MinComputation()
self.assertRaises(TypeError, comp.result, 1)
self.assertRaises(ValueError, comp.result)
class MaxComputationTestCase(test.TestCase):
def test_add_and_result(self):
comp = algo.MaxComputation()
[comp.add(i) for i in [3, 5.2, 2, -1, 1, 8, 33.4, 0, -3, 42, -2]]
self.assertEqual(42, comp.result())
def test_add_raises(self):
comp = algo.MaxComputation()
self.assertRaises(TypeError, comp.add)
self.assertRaises(TypeError, comp.add, None)
self.assertRaises(TypeError, comp.add, "str")
def test_result_raises(self):
comp = algo.MaxComputation()
self.assertRaises(TypeError, comp.result, 1)
self.assertRaises(ValueError, comp.result)
@ddt.ddt
class PercentileComputationTestCase(test.TestCase):
mixed16 = [55.71, 83.05, 24.12, 27, 48.36, 16.36, 96.23, 6, 16.0, 88.11,
29.52, 99.2, 79.96, 77.84, 85.45, 85.32, 7, 17.1, 3.02, 15.23]
mixed50 = [51.63, 82.2, 52.52, .05, 66, 94.03, 78.6, 80.9, 51.89, 79, 1.4,
65.06, 12.46, 51.89, 41, 45.39, 124, 62.2, 32.72, 56.98, 31.19,
26.27, 97.3, 56.6, 19.75, 69, 25.03, 10.76, 17.71, 29.4, 15.75,
19.88, 90.16, 82.0, 63.4, 14.84, 49.07, 72.06, 41, 1.48, 82.19,
48.45, 53, 88.33, 52.31, 62, 15.96, 21.17, 25.33, 53.27]
mixed5000 = mixed50 * 1000
range5000 = range(5000)
@ddt.data(
{"stream": "mixed16", "percent": 25, "expected": 16.18},
{"stream": "mixed16", "percent": 50, "expected": 38.94},
{"stream": "mixed16", "percent": 90, "expected": 92.17},
{"stream": "mixed50", "percent": 25, "expected": 23.1},
{"stream": "mixed50", "percent": 50, "expected": 51.89},
{"stream": "mixed50", "percent": 90, "expected": 85.265},
{"stream": "mixed5000", "percent": 25, "expected": 25.03},
{"stream": "mixed5000", "percent": 50, "expected": 51.89},
{"stream": "mixed5000", "percent": 90, "expected": 85.265},
{"stream": "range5000", "percent": 25, "expected": 1249.5},
{"stream": "range5000", "percent": 50, "expected": 2499.5},
{"stream": "range5000", "percent": 90, "expected": 4499.5})
@ddt.unpack
def test_add_and_result(self, percent, stream, expected):
comp = algo.PercentileComputation(percent=percent)
[comp.add(i) for i in getattr(self, stream)]
self.assertEqual(expected, comp.result())
def test_add_raises(self):
comp = algo.PercentileComputation(50)
self.assertRaises(TypeError, comp.add)
self.assertRaises(TypeError, comp.add, None)
self.assertRaises(TypeError, comp.add, "str")
def test_result_raises(self):
self.assertRaises(TypeError, algo.PercentileComputation)
comp = algo.PercentileComputation(50)
self.assertRaises(ValueError, comp.result)
class ProgressComputationTestCase(test.TestCase):
def test___init__raises(self):
self.assertRaises(TypeError, algo.ProgressComputation)
self.assertRaises(TypeError, algo.ProgressComputation, None)
self.assertRaises(ValueError, algo.ProgressComputation, "str")
def test_add_and_result(self):
comp = algo.ProgressComputation(42)
self.assertEqual(0, comp.result())
for expected_progress in (2.38, 4.76, 7.14, 9.52, 11.9, 14.29,
16.67, 19.05, 21.43):
comp.add(42)
self.assertEqual(expected_progress, round(comp.result(), 2))
def test_add_raises(self):
comp = algo.ProgressComputation(42)
[comp.add(123) for i in range(42)]
self.assertRaises(RuntimeError, comp.add, None)
self.assertRaises(RuntimeError, comp.add, 123)
class IncrementComputationTestCase(test.TestCase):
def test_add_and_result(self):
comp = algo.IncrementComputation()
for i in range(1, 100):
self.assertEqual(i - 1, comp.result())
comp.add(42)
self.assertEqual(i, comp.result())